Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-26 Thread Yang Wang
Usually, you should use the HDFS nameservice instead of the NameNode
hostname:port to avoid NN failover.
And you could find the supported nameservice in the hdfs-site.xml in the
key *dfs.nameservices*.


Best,
Yang

On Fri, Mar 22, 2024 at 8:33 PM Sachin Mittal  wrote:

> So, when we create an EMR cluster the NN service runs on the primary node
> of the cluster.
> Now at the time of creating the cluster, how can we specify the name of
> this NN in format hdfs://*namenode-host*:8020/.
>
> Is there a standard name by which we can identify the NN server ?
>
> Thanks
> Sachin
>
>
> On Fri, Mar 22, 2024 at 12:08 PM Asimansu Bera 
> wrote:
>
>> Hello Sachin,
>>
>> Typically, Cloud VMs are ephemeral, meaning that if the EMR cluster goes
>> down or VMs are required to be shut down for security updates or due to
>> faults, new VMs will be added to the cluster. As a result, any data stored
>> in the local file system, such as file://tmp, would be lost. To ensure data
>> persistence and prevent loss of checkpoint or savepoint data for recovery,
>> it is advisable to store such data in a persistent storage solution like
>> HDFS or S3.
>>
>> Generally, EMR based Hadoop NN runs on 8020 port. You may find the NN IP
>> details from EMR service.
>>
>> Hope this helps.
>>
>> -A
>>
>>
>> On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal 
>> wrote:
>>
>>> Hi,
>>> We are using AWS EMR where we can submit our flink jobs to a long
>>> running flink cluster on Yarn.
>>>
>>> We wanted to configure RocksDBStateBackend as our state backend to store
>>> our checkpoints.
>>>
>>> So we have configured following properties in our flink-conf.yaml
>>>
>>>- state.backend.type: rocksdb
>>>- state.checkpoints.dir: file:///tmp
>>>- state.backend.incremental: true
>>>
>>>
>>> My question here is regarding the checkpoint location: what is the
>>> difference between the location if it is a local filesystem vs a hadoop
>>> distributed file system (hdfs).
>>>
>>> What advantages we get if we use:
>>>
>>> *state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
>>> vs
>>> *state.checkpoints.dir*: file:///tmp
>>>
>>> Also if we decide to use HDFS then from where we can get the value for
>>> *namenode-host:port*
>>> given we are running Flink on an EMR.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>


Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-22 Thread Sachin Mittal
So, when we create an EMR cluster the NN service runs on the primary node
of the cluster.
Now at the time of creating the cluster, how can we specify the name of
this NN in format hdfs://*namenode-host*:8020/.

Is there a standard name by which we can identify the NN server ?

Thanks
Sachin


On Fri, Mar 22, 2024 at 12:08 PM Asimansu Bera 
wrote:

> Hello Sachin,
>
> Typically, Cloud VMs are ephemeral, meaning that if the EMR cluster goes
> down or VMs are required to be shut down for security updates or due to
> faults, new VMs will be added to the cluster. As a result, any data stored
> in the local file system, such as file://tmp, would be lost. To ensure data
> persistence and prevent loss of checkpoint or savepoint data for recovery,
> it is advisable to store such data in a persistent storage solution like
> HDFS or S3.
>
> Generally, EMR based Hadoop NN runs on 8020 port. You may find the NN IP
> details from EMR service.
>
> Hope this helps.
>
> -A
>
>
> On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal  wrote:
>
>> Hi,
>> We are using AWS EMR where we can submit our flink jobs to a long running
>> flink cluster on Yarn.
>>
>> We wanted to configure RocksDBStateBackend as our state backend to store
>> our checkpoints.
>>
>> So we have configured following properties in our flink-conf.yaml
>>
>>- state.backend.type: rocksdb
>>- state.checkpoints.dir: file:///tmp
>>- state.backend.incremental: true
>>
>>
>> My question here is regarding the checkpoint location: what is the
>> difference between the location if it is a local filesystem vs a hadoop
>> distributed file system (hdfs).
>>
>> What advantages we get if we use:
>>
>> *state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
>> vs
>> *state.checkpoints.dir*: file:///tmp
>>
>> Also if we decide to use HDFS then from where we can get the value for
>> *namenode-host:port*
>> given we are running Flink on an EMR.
>>
>> Thanks
>> Sachin
>>
>>
>>


Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-22 Thread Asimansu Bera
Hello Sachin,

Typically, Cloud VMs are ephemeral, meaning that if the EMR cluster goes
down or VMs are required to be shut down for security updates or due to
faults, new VMs will be added to the cluster. As a result, any data stored
in the local file system, such as file://tmp, would be lost. To ensure data
persistence and prevent loss of checkpoint or savepoint data for recovery,
it is advisable to store such data in a persistent storage solution like
HDFS or S3.

Generally, EMR based Hadoop NN runs on 8020 port. You may find the NN IP
details from EMR service.

Hope this helps.

-A


On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal  wrote:

> Hi,
> We are using AWS EMR where we can submit our flink jobs to a long running
> flink cluster on Yarn.
>
> We wanted to configure RocksDBStateBackend as our state backend to store
> our checkpoints.
>
> So we have configured following properties in our flink-conf.yaml
>
>- state.backend.type: rocksdb
>- state.checkpoints.dir: file:///tmp
>- state.backend.incremental: true
>
>
> My question here is regarding the checkpoint location: what is the
> difference between the location if it is a local filesystem vs a hadoop
> distributed file system (hdfs).
>
> What advantages we get if we use:
>
> *state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
> vs
> *state.checkpoints.dir*: file:///tmp
>
> Also if we decide to use HDFS then from where we can get the value for
> *namenode-host:port*
> given we are running Flink on an EMR.
>
> Thanks
> Sachin
>
>
>


Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-21 Thread Sachin Mittal
Hi,
We are using AWS EMR where we can submit our flink jobs to a long running
flink cluster on Yarn.

We wanted to configure RocksDBStateBackend as our state backend to store
our checkpoints.

So we have configured following properties in our flink-conf.yaml

   - state.backend.type: rocksdb
   - state.checkpoints.dir: file:///tmp
   - state.backend.incremental: true


My question here is regarding the checkpoint location: what is the
difference between the location if it is a local filesystem vs a hadoop
distributed file system (hdfs).

What advantages we get if we use:

*state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
vs
*state.checkpoints.dir*: file:///tmp

Also if we decide to use HDFS then from where we can get the value for
*namenode-host:port*
given we are running Flink on an EMR.

Thanks
Sachin


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
This is our yarn related settings:

yarn.scheduler.fair.assignmultiple: "true"
yarn.scheduler.fair.dynamic.max.assign: "false"
yarn.scheduler.fair.max.assign: 1

any suggestions?

Best
Lu

On Wed, Sep 6, 2023 at 9:16 AM Lu Niu  wrote:

> Hi, Thanks for all your help. Are there any other insights?
>
>
> Best
> Lu
>
> On Wed, Aug 30, 2023 at 11:29 AM Lu Niu  wrote:
>
>> No. we don't use yarn.taskmanager.node-label
>>
>> Best
>> Lu
>>
>> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:
>>
>>> Maybe you can check if you have set yarn.taskmanager.node-label for some
>>> flink jobs?
>>>
>>> Best,
>>> Biao Geng
>>>
>>> 发送自 Outlook for iOS <https://aka.ms/o0ukef>
>>> --
>>> *发件人:* Chen Zhanghao 
>>> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
>>> *收件人:* Lu Niu ; Weihua Hu 
>>> *抄送:* Kenan Kılıçtepe ; user <
>>> user@flink.apache.org>
>>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>>
>>> CCing @Weihua Hu  , who is an expert on this.
>>> Do you have any ideas on the phenomenon here?
>>>
>>> Best,
>>> Zhanghao Chen
>>> --
>>> *From:* Lu Niu 
>>> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
>>> *To:* Chen Zhanghao 
>>> *Cc:* Kenan Kılıçtepe ; user <
>>> user@flink.apache.org>
>>> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>>>
>>> Thanks for your reply.
>>>
>>> The interesting fact is that we also managed spark on yarn. However.
>>> Only the flink cluster is having the issue. I am wondering whether there is
>>> a difference in the implementation on flink side.
>>>
>>> Best
>>> Lu
>>>
>>> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
>>> wrote:
>>>
>>> Hi Lu Niu,
>>>
>>> TM distribution on YARN nodes is managed by YARN RM and is out of the
>>> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
>>> even distribution of tasks among Flink TMs, and has nothing to do with your
>>> concerns. Also, the config currently only supports Standalone mode Flink
>>> clusters, and does not take effect on a Flink cluster on YARN.
>>>
>>> Best,
>>> Zhanghao Chen
>>> --
>>> *发件人:* Lu Niu 
>>> *发送时间:* 2023年8月29日 4:30
>>> *收件人:* Kenan Kılıçtepe 
>>> *抄送:* user 
>>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>>
>>> Thanks for the reply. We've already set cluster.evenly-spread-out-slots
>>> = true
>>>
>>> Best
>>> Lu
>>>
>>> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
>>> wrote:
>>>
>>> Have you checked config param cluster.evenly-spread-out-slots ?
>>>
>>>
>>> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>>>
>>> Hi, Flink users
>>>
>>> We have recently observed that the allocation of Flink TaskManagers in
>>> our YARN cluster is not evenly distributed. We would like to hear your
>>> thoughts on this matter.
>>>
>>> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
>>> 2. The uneven distribution is that out of a 370-node YARN cluster, there
>>> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
>>> than 10 vCores available.
>>>
>>> Is such behavior expected? If not, is there a fix provided in Flink?
>>> Thanks!
>>>
>>> Best
>>> Lu
>>>
>>>


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Thanks, I'll check it out!

Best
Lu

On Wed, Sep 6, 2023 at 10:09 AM Biao Geng  wrote:

> Hi,
>
>
>
> If your YARN cluster uses fair scheduler, maybe you can check if the
> yarn.scheduler.fair.assignmultiple
> <https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/FairScheduler.html>
> config is set. If that’s the case, then adjusting
> yarn.scheduler.fair.dynamic.max.assign and yarn.scheduler.fair.max.assign
> could be helpful. Also, AFAIK, flink does not exert extra control of
> distribution of yarn apps on different nodes. The key diff between flink
> and spark is that most flink jobs are unbounded while spark jobs are
> bounded. It is possible that under same YARN scheduling strategy, the final
> distribution of apps after some time is different.
>
>
>
> Best,
>
> Biao Geng
>
>
>
> *From: *Lu Niu 
> *Date: *Thursday, September 7, 2023 at 12:17 AM
> *To: *Geng Biao 
> *Cc: *Chen Zhanghao , Weihua Hu <
> huweihua@gmail.com>, Kenan Kılıçtepe , user <
> user@flink.apache.org>
> *Subject: *Re: Uneven TM Distribution of Flink on YARN
>
> Hi, Thanks for all your help. Are there any other insights?
>
>
>
>
>
> Best
>
> Lu
>
>
>
> On Wed, Aug 30, 2023 at 11:29 AM Lu Niu  wrote:
>
> No. we don't use yarn.taskmanager.node-label
>
>
>
> Best
>
> Lu
>
>
>
> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:
>
> Maybe you can check if you have set yarn.taskmanager.node-label for some
> flink jobs?
>
>
>
> Best,
>
> Biao Geng
>
>
>
> 发送自 Outlook for iOS <https://aka.ms/o0ukef>
> --
>
> *发件人**:* Chen Zhanghao 
> *发送时间**:* Tuesday, August 29, 2023 12:14:53 PM
> *收件人**:* Lu Niu ; Weihua Hu 
> *抄送**:* Kenan Kılıçtepe ; user <
> user@flink.apache.org>
> *主题**:* Re: Uneven TM Distribution of Flink on YARN
>
>
>
> CCing @Weihua Hu  , who is an expert on this. Do
> you have any ideas on the phenomenon here?
>
>
>
> Best,
>
> Zhanghao Chen
> --
>
> *From:* Lu Niu 
> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
> *To:* Chen Zhanghao 
> *Cc:* Kenan Kılıçtepe ; user 
> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>
>
>
> Thanks for your reply.
>
>
>
> The interesting fact is that we also managed spark on yarn. However. Only
> the flink cluster is having the issue. I am wondering whether there is a
> difference in the implementation on flink side.
>
>
>
> Best
>
> Lu
>
>
>
> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
> wrote:
>
> Hi Lu Niu,
>
>
>
> TM distribution on YARN nodes is managed by YARN RM and is out of the
> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
> even distribution of tasks among Flink TMs, and has nothing to do with your
> concerns. Also, the config currently only supports Standalone mode Flink
> clusters, and does not take effect on a Flink cluster on YARN.
>
>
>
> Best,
>
> Zhanghao Chen
> --
>
> *发件人**:* Lu Niu 
> *发送时间**:* 2023年8月29日 4:30
> *收件人**:* Kenan Kılıçtepe 
> *抄送**:* user 
> *主题**:* Re: Uneven TM Distribution of Flink on YARN
>
>
>
> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
> true
>
>
>
> Best
>
> Lu
>
>
>
> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
> wrote:
>
> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
>
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>
> Hi, Flink users
>
>
>
> We have recently observed that the allocation of Flink TaskManagers in our
> YARN cluster is not evenly distributed. We would like to hear your thoughts
> on this matter.
>
> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
> 2. The uneven distribution is that out of a 370-node YARN cluster, there
> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
> than 10 vCores available.
>
>
>
> Is such behavior expected? If not, is there a fix provided in Flink?
> Thanks!
>
>
>
> Best
>
> Lu
>
>


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Biao Geng
Hi,

If your YARN cluster uses fair scheduler, maybe you can check if the 
yarn.scheduler.fair.assignmultiple<https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/FairScheduler.html>
 config is set. If that’s the case, then adjusting 
yarn.scheduler.fair.dynamic.max.assign and yarn.scheduler.fair.max.assign could 
be helpful. Also, AFAIK, flink does not exert extra control of distribution of 
yarn apps on different nodes. The key diff between flink and spark is that most 
flink jobs are unbounded while spark jobs are bounded. It is possible that 
under same YARN scheduling strategy, the final distribution of apps after some 
time is different.

Best,
Biao Geng

From: Lu Niu 
Date: Thursday, September 7, 2023 at 12:17 AM
To: Geng Biao 
Cc: Chen Zhanghao , Weihua Hu 
, Kenan Kılıçtepe , user 

Subject: Re: Uneven TM Distribution of Flink on YARN
Hi, Thanks for all your help. Are there any other insights?


Best
Lu

On Wed, Aug 30, 2023 at 11:29 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
No. we don't use yarn.taskmanager.node-label

Best
Lu

On Tue, Aug 29, 2023 at 12:17 AM Geng Biao 
mailto:biaoge...@gmail.com>> wrote:
Maybe you can check if you have set yarn.taskmanager.node-label for some flink 
jobs?

Best,
Biao Geng

发送自 Outlook for iOS<https://aka.ms/o0ukef>

发件人: Chen Zhanghao mailto:zhanghao.c...@outlook.com>>
发送时间: Tuesday, August 29, 2023 12:14:53 PM
收件人: Lu Niu mailto:qqib...@gmail.com>>; Weihua Hu 
mailto:huweihua@gmail.com>>
抄送: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>; user 
mailto:user@flink.apache.org>>
主题: Re: Uneven TM Distribution of Flink on YARN

CCing @Weihua Hu<mailto:huweihua@gmail.com> , who is an expert on this. Do 
you have any ideas on the phenomenon here?

Best,
Zhanghao Chen

From: Lu Niu mailto:qqib...@gmail.com>>
Sent: Tuesday, August 29, 2023 12:11:35 PM
To: Chen Zhanghao mailto:zhanghao.c...@outlook.com>>
Cc: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Uneven TM Distribution of Flink on YARN

Thanks for your reply.

The interesting fact is that we also managed spark on yarn. However. Only the 
flink cluster is having the issue. I am wondering whether there is a difference 
in the implementation on flink side.

Best
Lu

On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Lu Niu,

TM distribution on YARN nodes is managed by YARN RM and is out of the scope of 
Flink. On the other hand, cluster.evenly-spread-out-slots forces even 
distribution of tasks among Flink TMs, and has nothing to do with your 
concerns. Also, the config currently only supports Standalone mode Flink 
clusters, and does not take effect on a Flink cluster on YARN.

Best,
Zhanghao Chen

发件人: Lu Niu mailto:qqib...@gmail.com>>
发送时间: 2023年8月29日 4:30
收件人: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: Uneven TM Distribution of Flink on YARN

Thanks for the reply. We've already set cluster.evenly-spread-out-slots = true

Best
Lu

On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
mailto:kkilict...@gmail.com>> wrote:
Have you checked config param cluster.evenly-spread-out-slots ?


On Mon, Aug 28, 2023 at 10:31 PM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Flink users

We have recently observed that the allocation of Flink TaskManagers in our YARN 
cluster is not evenly distributed. We would like to hear your thoughts on this 
matter.

1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a 370-node YARN cluster, there are 16 
nodes with either 0 or 1 vCore available, while 110 nodes have more than 10 
vCores available.

Is such behavior expected? If not, is there a fix provided in Flink? Thanks!

Best
Lu


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Hi, Thanks for all your help. Are there any other insights?


Best
Lu

On Wed, Aug 30, 2023 at 11:29 AM Lu Niu  wrote:

> No. we don't use yarn.taskmanager.node-label
>
> Best
> Lu
>
> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:
>
>> Maybe you can check if you have set yarn.taskmanager.node-label for some
>> flink jobs?
>>
>> Best,
>> Biao Geng
>>
>> 发送自 Outlook for iOS <https://aka.ms/o0ukef>
>> --
>> *发件人:* Chen Zhanghao 
>> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
>> *收件人:* Lu Niu ; Weihua Hu 
>> *抄送:* Kenan Kılıçtepe ; user > >
>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>
>> CCing @Weihua Hu  , who is an expert on this. Do
>> you have any ideas on the phenomenon here?
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Lu Niu 
>> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
>> *To:* Chen Zhanghao 
>> *Cc:* Kenan Kılıçtepe ; user > >
>> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>>
>> Thanks for your reply.
>>
>> The interesting fact is that we also managed spark on yarn. However. Only
>> the flink cluster is having the issue. I am wondering whether there is a
>> difference in the implementation on flink side.
>>
>> Best
>> Lu
>>
>> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
>> wrote:
>>
>> Hi Lu Niu,
>>
>> TM distribution on YARN nodes is managed by YARN RM and is out of the
>> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
>> even distribution of tasks among Flink TMs, and has nothing to do with your
>> concerns. Also, the config currently only supports Standalone mode Flink
>> clusters, and does not take effect on a Flink cluster on YARN.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *发件人:* Lu Niu 
>> *发送时间:* 2023年8月29日 4:30
>> *收件人:* Kenan Kılıçtepe 
>> *抄送:* user 
>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>
>> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
>> true
>>
>> Best
>> Lu
>>
>> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
>> wrote:
>>
>> Have you checked config param cluster.evenly-spread-out-slots ?
>>
>>
>> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>>
>> Hi, Flink users
>>
>> We have recently observed that the allocation of Flink TaskManagers in
>> our YARN cluster is not evenly distributed. We would like to hear your
>> thoughts on this matter.
>>
>> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
>> 2. The uneven distribution is that out of a 370-node YARN cluster, there
>> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
>> than 10 vCores available.
>>
>> Is such behavior expected? If not, is there a fix provided in Flink?
>> Thanks!
>>
>> Best
>> Lu
>>
>>


Re: Uneven TM Distribution of Flink on YARN

2023-08-30 Thread Lu Niu
No. we don't use yarn.taskmanager.node-label

Best
Lu

On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:

> Maybe you can check if you have set yarn.taskmanager.node-label for some
> flink jobs?
>
> Best,
> Biao Geng
>
> 发送自 Outlook for iOS <https://aka.ms/o0ukef>
> --
> *发件人:* Chen Zhanghao 
> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
> *收件人:* Lu Niu ; Weihua Hu 
> *抄送:* Kenan Kılıçtepe ; user 
> *主题:* Re: Uneven TM Distribution of Flink on YARN
>
> CCing @Weihua Hu  , who is an expert on this. Do
> you have any ideas on the phenomenon here?
>
> Best,
> Zhanghao Chen
> --
> *From:* Lu Niu 
> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
> *To:* Chen Zhanghao 
> *Cc:* Kenan Kılıçtepe ; user 
> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>
> Thanks for your reply.
>
> The interesting fact is that we also managed spark on yarn. However. Only
> the flink cluster is having the issue. I am wondering whether there is a
> difference in the implementation on flink side.
>
> Best
> Lu
>
> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
> wrote:
>
> Hi Lu Niu,
>
> TM distribution on YARN nodes is managed by YARN RM and is out of the
> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
> even distribution of tasks among Flink TMs, and has nothing to do with your
> concerns. Also, the config currently only supports Standalone mode Flink
> clusters, and does not take effect on a Flink cluster on YARN.
>
> Best,
> Zhanghao Chen
> --
> *发件人:* Lu Niu 
> *发送时间:* 2023年8月29日 4:30
> *收件人:* Kenan Kılıçtepe 
> *抄送:* user 
> *主题:* Re: Uneven TM Distribution of Flink on YARN
>
> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
> true
>
> Best
> Lu
>
> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
> wrote:
>
> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>
> Hi, Flink users
>
> We have recently observed that the allocation of Flink TaskManagers in our
> YARN cluster is not evenly distributed. We would like to hear your thoughts
> on this matter.
>
> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
> 2. The uneven distribution is that out of a 370-node YARN cluster, there
> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
> than 10 vCores available.
>
> Is such behavior expected? If not, is there a fix provided in Flink?
> Thanks!
>
> Best
> Lu
>
>


Re: Uneven TM Distribution of Flink on YARN

2023-08-29 Thread Geng Biao
Maybe you can check if you have set yarn.taskmanager.node-label for some flink 
jobs?

Best,
Biao Geng

发送自 Outlook for iOS<https://aka.ms/o0ukef>

发件人: Chen Zhanghao 
发送时间: Tuesday, August 29, 2023 12:14:53 PM
收件人: Lu Niu ; Weihua Hu 
抄送: Kenan Kılıçtepe ; user 
主题: Re: Uneven TM Distribution of Flink on YARN

CCing @Weihua Hu<mailto:huweihua@gmail.com> , who is an expert on this. Do 
you have any ideas on the phenomenon here?

Best,
Zhanghao Chen

From: Lu Niu 
Sent: Tuesday, August 29, 2023 12:11:35 PM
To: Chen Zhanghao 
Cc: Kenan Kılıçtepe ; user 
Subject: Re: Uneven TM Distribution of Flink on YARN

Thanks for your reply.

The interesting fact is that we also managed spark on yarn. However. Only the 
flink cluster is having the issue. I am wondering whether there is a difference 
in the implementation on flink side.

Best
Lu

On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Lu Niu,

TM distribution on YARN nodes is managed by YARN RM and is out of the scope of 
Flink. On the other hand, cluster.evenly-spread-out-slots forces even 
distribution of tasks among Flink TMs, and has nothing to do with your 
concerns. Also, the config currently only supports Standalone mode Flink 
clusters, and does not take effect on a Flink cluster on YARN.

Best,
Zhanghao Chen

发件人: Lu Niu mailto:qqib...@gmail.com>>
发送时间: 2023年8月29日 4:30
收件人: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: Uneven TM Distribution of Flink on YARN

Thanks for the reply. We've already set cluster.evenly-spread-out-slots = true

Best
Lu

On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
mailto:kkilict...@gmail.com>> wrote:
Have you checked config param cluster.evenly-spread-out-slots ?


On Mon, Aug 28, 2023 at 10:31 PM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Flink users

We have recently observed that the allocation of Flink TaskManagers in our YARN 
cluster is not evenly distributed. We would like to hear your thoughts on this 
matter.

1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a 370-node YARN cluster, there are 16 
nodes with either 0 or 1 vCore available, while 110 nodes have more than 10 
vCores available.

Is such behavior expected? If not, is there a fix provided in Flink? Thanks!

Best
Lu


Re: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Chen Zhanghao
CCing @Weihua Hu<mailto:huweihua@gmail.com> , who is an expert on this. Do 
you have any ideas on the phenomenon here?

Best,
Zhanghao Chen

From: Lu Niu 
Sent: Tuesday, August 29, 2023 12:11:35 PM
To: Chen Zhanghao 
Cc: Kenan Kılıçtepe ; user 
Subject: Re: Uneven TM Distribution of Flink on YARN

Thanks for your reply.

The interesting fact is that we also managed spark on yarn. However. Only the 
flink cluster is having the issue. I am wondering whether there is a difference 
in the implementation on flink side.

Best
Lu

On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Lu Niu,

TM distribution on YARN nodes is managed by YARN RM and is out of the scope of 
Flink. On the other hand, cluster.evenly-spread-out-slots forces even 
distribution of tasks among Flink TMs, and has nothing to do with your 
concerns. Also, the config currently only supports Standalone mode Flink 
clusters, and does not take effect on a Flink cluster on YARN.

Best,
Zhanghao Chen

发件人: Lu Niu mailto:qqib...@gmail.com>>
发送时间: 2023年8月29日 4:30
收件人: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: Uneven TM Distribution of Flink on YARN

Thanks for the reply. We've already set cluster.evenly-spread-out-slots = true

Best
Lu

On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
mailto:kkilict...@gmail.com>> wrote:
Have you checked config param cluster.evenly-spread-out-slots ?


On Mon, Aug 28, 2023 at 10:31 PM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Flink users

We have recently observed that the allocation of Flink TaskManagers in our YARN 
cluster is not evenly distributed. We would like to hear your thoughts on this 
matter.

1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a 370-node YARN cluster, there are 16 
nodes with either 0 or 1 vCore available, while 110 nodes have more than 10 
vCores available.

Is such behavior expected? If not, is there a fix provided in Flink? Thanks!

Best
Lu


Re: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Lu Niu
Thanks for your reply.

The interesting fact is that we also managed spark on yarn. However. Only
the flink cluster is having the issue. I am wondering whether there is a
difference in the implementation on flink side.

Best
Lu

On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
wrote:

> Hi Lu Niu,
>
> TM distribution on YARN nodes is managed by YARN RM and is out of the
> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
> even distribution of tasks among Flink TMs, and has nothing to do with your
> concerns. Also, the config currently only supports Standalone mode Flink
> clusters, and does not take effect on a Flink cluster on YARN.
>
> Best,
> Zhanghao Chen
> --
> *发件人:* Lu Niu 
> *发送时间:* 2023年8月29日 4:30
> *收件人:* Kenan Kılıçtepe 
> *抄送:* user 
> *主题:* Re: Uneven TM Distribution of Flink on YARN
>
> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
> true
>
> Best
> Lu
>
> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
> wrote:
>
> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>
> Hi, Flink users
>
> We have recently observed that the allocation of Flink TaskManagers in our
> YARN cluster is not evenly distributed. We would like to hear your thoughts
> on this matter.
>
> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
> 2. The uneven distribution is that out of a 370-node YARN cluster, there
> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
> than 10 vCores available.
>
> Is such behavior expected? If not, is there a fix provided in Flink?
> Thanks!
>
> Best
> Lu
>
>


回复: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Chen Zhanghao
Hi Lu Niu,

TM distribution on YARN nodes is managed by YARN RM and is out of the scope of 
Flink. On the other hand, cluster.evenly-spread-out-slots forces even 
distribution of tasks among Flink TMs, and has nothing to do with your 
concerns. Also, the config currently only supports Standalone mode Flink 
clusters, and does not take effect on a Flink cluster on YARN.

Best,
Zhanghao Chen

发件人: Lu Niu 
发送时间: 2023年8月29日 4:30
收件人: Kenan Kılıçtepe 
抄送: user 
主题: Re: Uneven TM Distribution of Flink on YARN

Thanks for the reply. We've already set cluster.evenly-spread-out-slots = true

Best
Lu

On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
mailto:kkilict...@gmail.com>> wrote:
Have you checked config param cluster.evenly-spread-out-slots ?


On Mon, Aug 28, 2023 at 10:31 PM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Flink users

We have recently observed that the allocation of Flink TaskManagers in our YARN 
cluster is not evenly distributed. We would like to hear your thoughts on this 
matter.

1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a 370-node YARN cluster, there are 16 
nodes with either 0 or 1 vCore available, while 110 nodes have more than 10 
vCores available.

Is such behavior expected? If not, is there a fix provided in Flink? Thanks!

Best
Lu


Re: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Lu Niu
Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
true

Best
Lu

On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
wrote:

> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>
>> Hi, Flink users
>>
>> We have recently observed that the allocation of Flink TaskManagers in
>> our YARN cluster is not evenly distributed. We would like to hear your
>> thoughts on this matter.
>>
>> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
>> 2. The uneven distribution is that out of a 370-node YARN cluster, there
>> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
>> than 10 vCores available.
>>
>> Is such behavior expected? If not, is there a fix provided in Flink?
>> Thanks!
>>
>> Best
>> Lu
>>
>


Re: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Kenan Kılıçtepe
Have you checked config param cluster.evenly-spread-out-slots ?


On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:

> Hi, Flink users
>
> We have recently observed that the allocation of Flink TaskManagers in our
> YARN cluster is not evenly distributed. We would like to hear your thoughts
> on this matter.
>
> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
> 2. The uneven distribution is that out of a 370-node YARN cluster, there
> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
> than 10 vCores available.
>
> Is such behavior expected? If not, is there a fix provided in Flink?
> Thanks!
>
> Best
> Lu
>


Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Lu Niu
Hi, Flink users

We have recently observed that the allocation of Flink TaskManagers in our
YARN cluster is not evenly distributed. We would like to hear your thoughts
on this matter.

1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a 370-node YARN cluster, there
are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
than 10 vCores available.

Is such behavior expected? If not, is there a fix provided in Flink?
Thanks!

Best
Lu


flink1.17.1版本 flink on yarn 提交无法获取配置文件

2023-08-01 Thread guanyq
/opt/flink/flink-1.17.1/bin/flink run-application -t yarn-application -yjm 
1024m -ytm 1024m ./xx-1.0.jar 
./config.properties以上提交命令制定的配置文件,为什么在容器内找配置文件?file 
/home/yarn/nm/usercache/root/appcache/application_1690773368385_0092/container_e183_1690773368385_0092_01_01/./config.properties
 does not exist

Re: flink on yarn rocksdb内存超用

2023-06-07 Thread Hangxiang Yu
Hi, 目前对RocksDB使用的内存是没有严格限制住的,可以参考这个 ticket:
https://issues.apache.org/jira/browse/FLINK-15532
如果要定位到内存使用情况,可以先看一些粗的Metrics:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
如果要再细致定位到单 instance 内部 RocksDB 的详细内存使用情况,可能需要用 malloc
的prof工具了,比如Jemalloc的Jeprof:
https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling

On Wed, Jun 7, 2023 at 4:58 PM crazy <2463829...@qq.com.invalid> wrote:

> Hi, 大佬们好,
>   请教下有个应用使用的flink1.13.5 on
> yarn,状态后端用的是rocksdb,任务运行一段时间就会内存超用,把overhead调大一些好像能缓解一些,请问有描述这类问题的相关issue吗?如何定位是哪部分内存超了呢?感谢
>
>
> crazy
> 2463829...@qq.com
>
>
>
> 



-- 
Best,
Hangxiang.


Re: Web UI don't show up In Flink on Yarn (Flink 1.17)

2023-05-25 Thread Weihua Hu
Hi,

yes i have tried ip directly.and no exception exists.but i found some info
> like the picture shows


Is there any response to the curl? How about enabling verbose logging for
curl with the command "curl -v xxx"

Best,
Weihua


On Thu, May 25, 2023 at 8:15 PM tan yao  wrote:

> and this info came from yarn resoucemanager
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> --
> *发件人:* tan yao 
> *发送时间:* Thursday, May 25, 2023 8:14:45 PM
> *收件人:* Weihua Hu 
> *抄送:* user 
> *主题:* Re: Web UI don't show up In Flink on Yarn (Flink 1.17)
>
> yes i have tried ip directly.and no exception exists.but i found some info
> like the picture shows
>
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> --
> *发件人:* Weihua Hu 
> *发送时间:* Thursday, May 25, 2023 7:51:25 PM
> *收件人:* tan yao 
> *抄送:* user 
> *主题:* Re: Web UI don't show up In Flink on Yarn (Flink 1.17)
>
> Hi,
>
> Are there any reported exceptions? Did you try using curl to query the
> rest API, such as "curl http://{ip:port}/overview;
>
> Best,
> Weihua
>
>
> On Thu, May 25, 2023 at 8:49 AM tan yao  wrote:
>
> Hi all,
> I find a strange thing with flink 1.17 deployed on yarn (CDH 6.x), flink
> web ui can not show up from yarn web link "ApplicationMaster",even typed
> jobmanager ip directly in browser .
>when i run wordcount application in flink 1.17 examples, and click yarn
> web "ApplicationMaster" link , the flink web ui just can not show up.But
> this works in flink 1.16 on the same env.
>
>


Re: Web UI don't show up In Flink on Yarn (Flink 1.17)

2023-05-25 Thread tan yao
and this info came from yarn resoucemanager

获取 Outlook for iOS<https://aka.ms/o0ukef>

发件人: tan yao 
发送时间: Thursday, May 25, 2023 8:14:45 PM
收件人: Weihua Hu 
抄送: user 
主题: Re: Web UI don't show up In Flink on Yarn (Flink 1.17)

yes i have tried ip directly.and no exception exists.but i found some info like 
the picture shows


获取 Outlook for iOS<https://aka.ms/o0ukef>

发件人: Weihua Hu 
发送时间: Thursday, May 25, 2023 7:51:25 PM
收件人: tan yao 
抄送: user 
主题: Re: Web UI don't show up In Flink on Yarn (Flink 1.17)

Hi,

Are there any reported exceptions? Did you try using curl to query the rest 
API, such as "curl http://{ip:port}/overview;

Best,
Weihua


On Thu, May 25, 2023 at 8:49 AM tan yao 
mailto:einstan_m...@outlook.com>> wrote:
Hi all,
  I find a strange thing with flink 1.17 deployed on yarn (CDH 6.x), flink web 
ui can not show up from yarn web link "ApplicationMaster",even typed jobmanager 
ip directly in browser .
   when i run wordcount application in flink 1.17 examples, and click yarn web 
"ApplicationMaster" link , the flink web ui just can not show up.But this works 
in flink 1.16 on the same env.


Re: Web UI don't show up In Flink on Yarn (Flink 1.17)

2023-05-25 Thread Weihua Hu
Hi,

Are there any reported exceptions? Did you try using curl to query the rest
API, such as "curl http://{ip:port}/overview;

Best,
Weihua


On Thu, May 25, 2023 at 8:49 AM tan yao  wrote:

> Hi all,
> I find a strange thing with flink 1.17 deployed on yarn (CDH 6.x), flink
> web ui can not show up from yarn web link "ApplicationMaster",even typed
> jobmanager ip directly in browser .
>when i run wordcount application in flink 1.17 examples, and click yarn
> web "ApplicationMaster" link , the flink web ui just can not show up.But
> this works in flink 1.16 on the same env.
>


Web UI don't show up In Flink on Yarn (Flink 1.17)

2023-05-24 Thread tan yao
Hi all,
  I find a strange thing with flink 1.17 deployed on yarn (CDH 6.x), flink web 
ui can not show up from yarn web link "ApplicationMaster",even typed jobmanager 
ip directly in browser .
   when i run wordcount application in flink 1.17 examples, and click yarn web 
"ApplicationMaster" link , the flink web ui just can not show up.But this works 
in flink 1.16 on the same env.


Re:Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 Thread guanyq
我昨天模拟下断电的情况
10个ha文件的日期是错开的5秒一个
chk-xxx也不是都损坏了,有的是可以启动的,这个我也试了 现在情况是 
yarn集群停电重启首先会循环尝试从10个ha的文件中启动应用,ha文件记录的chk的相关原数据 
1.如果ha文件都损坏了,即使chk没有损坏,flink应用也是拉不起来的

现在想的是让hdfs上存在至少1组个可用的的ha文件及其对应的chk 现在是5秒一个chk,保存了10个,也会出现损坏无法启动的问题 5秒*10 = 
50秒,也想知道多长时间的存档才能保证存在一组没有损坏ha和chk呢。














在 2023-03-14 10:16:48,"Guojun Li"  写道:
>Hi
>
>确认一下这些 ha 文件的 last modification time 是一致的还是错开的?
>
>另外,指定 chk- 恢复尝试了没有?可以恢复吗?
>
>Best,
>Guojun
>
>On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:
>
>> flink ha路径为 /tmp/flink/ha/
>> flink chk路径为 /tmp/flink/checkpoint
>>
>>
>> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>>
>>
>>
>>
>> 会尝试从10个chk恢复,日志有打印
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>>
>>
>>
>> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
>> - Starting the SlotManager.
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Successfully ran initialization on master in 0 ms.
>> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
>> Initialized ZooKeeperCompletedCheckpointStore in
>> '/checkpoints/3844b96b002601d932e66233dd46899c'.
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Using application-defined state backend: File State Backend (checkpoints:
>> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
>> fileStateThreshold: -1)
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Configuring application-defined state backend with job/cluster config
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
>> error constructing remote block reader.
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> 

Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 Thread Guojun Li
Hi

确认一下这些 ha 文件的 last modification time 是一致的还是错开的?

另外,指定 chk- 恢复尝试了没有?可以恢复吗?

Best,
Guojun

On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:

> flink ha路径为 /tmp/flink/ha/
> flink chk路径为 /tmp/flink/checkpoint
>
>
> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>
>
>
>
> 会尝试从10个chk恢复,日志有打印
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
>
>
>
> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> - Starting the SlotManager.
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Successfully ran initialization on master in 0 ms.
> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
> Initialized ZooKeeperCompletedCheckpointStore in
> '/checkpoints/3844b96b002601d932e66233dd46899c'.
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
> fileStateThreshold: -1)
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Configuring application-defined state backend with job/cluster config
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
> error constructing remote block reader.
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> 

Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq



理解了,非常感谢。








在 2023-03-13 16:57:18,"Weihua Hu"  写道:
>图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。
>
>YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
>控制,在这个时间内达到指定次数才会退出。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>> >Hi,
>> >
>> >图片看不到了
>> >
>> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>> >
>> >> flink1.10版本,flink配置如下
>> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> >> 请问appattempt_1678102326043_0006_000409
>> >> 
>> >> 每个序号不是代表一次尝试么
>> >>
>>
>>


Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread Weihua Hu
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> 
> >> 每个序号不是代表一次尝试么
> >>
>
>


Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq
图片在附件
但是实际却是超过了10次。。
















在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> 请问appattempt_1678102326043_0006_000409
>> 
>> 每个序号不是代表一次尝试么
>>


Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread Weihua Hu
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> 
> 每个序号不是代表一次尝试么
>


flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq
flink1.10版本,flink配置如下
yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么


Re:Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread guanyq
flink ha路径为 /tmp/flink/ha/
flink chk路径为 /tmp/flink/checkpoint


我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。




会尝试从10个chk恢复,日志有打印
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7086.


详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
 - Starting the SlotManager.
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster - 
Successfully ran initialization on master in 0 ms.
2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils - 
Initialized ZooKeeperCompletedCheckpointStore in 
'/checkpoints/3844b96b002601d932e66233dd46899c'.
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - Using 
application-defined state backend: File State Backend (checkpoints: 
'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, 
fileStateThreshold: -1)
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - 
Configuring application-defined state backend with job/cluster config
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7086.
2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O error 
constructing remote block reader.
java.io.IOException: Got error, status message opReadBlock 
BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received 
exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, 
self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
for file 
/tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, 
for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
at 

Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread Weihua Hu
Hi

一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)

有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
以及最终尝试从哪一次 cp 恢复的。

也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复


Best,
Weihua


On Fri, Mar 10, 2023 at 10:38 AM guanyq  wrote:

> 没有开启增量chk
> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> 错误日志为:
>
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 10:26:11,"Yanfei Lei"  写道:
> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> chk重启[1]。
> >
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >是观察到checkpoint dir下面没有文件吗?
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >
> >guanyq  于2023年3月10日周五 08:58写道:
> >>
> >> 目前也想着用savepoint处理异常停电的问题
> >> 但是我这面还有个疑问:
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> 想问下:
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
> >> >Hi
> >> >
> >> >我觉得Flink
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >
> >> >Best,
> >> >Shammon
> >> >
> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
> >> >
> >> >> 前提
> >> >> 1.flink配置了高可用
> >> >> 2.flink配置checkpoint数为10
> >> >> 3.yarn集群配置了任务恢复
> >> >> 疑问
> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >>
> >> >>
> >> >>
> >> >>
> >
> >
> >
> >--
> >Best,
> >Yanfei
>


Re:Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread guanyq
没有开启增量chk
文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
错误日志为:

java.io.IOException: Got error, status message opReadBlock 
BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received 
exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, 
self=/ip:45534, remote=/ip:9866,
for file 
/tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, 
for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
















在 2023-03-10 10:26:11,"Yanfei Lei"  写道:
>Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。
>
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>是观察到checkpoint dir下面没有文件吗?
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>
>guanyq  于2023年3月10日周五 08:58写道:
>>
>> 目前也想着用savepoint处理异常停电的问题
>> 但是我这面还有个疑问:
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> 想问下:
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
>> >Hi
>> >
>> >我觉得Flink
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >
>> >Best,
>> >Shammon
>> >
>> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
>> >
>> >> 前提
>> >> 1.flink配置了高可用
>> >> 2.flink配置checkpoint数为10
>> >> 3.yarn集群配置了任务恢复
>> >> 疑问
>> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >>
>> >>
>> >>
>> >>
>
>
>
>-- 
>Best,
>Yanfei


Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread Yanfei Lei
Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。

> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。

> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
是观察到checkpoint dir下面没有文件吗?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

guanyq  于2023年3月10日周五 08:58写道:
>
> 目前也想着用savepoint处理异常停电的问题
> 但是我这面还有个疑问:
> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> 就很奇怪是不是10个checkpoint都没落盘导致的。
> 想问下:
> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
> >Hi
> >
> >我觉得Flink
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >
> >Best,
> >Shammon
> >
> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
> >
> >> 前提
> >> 1.flink配置了高可用
> >> 2.flink配置checkpoint数为10
> >> 3.yarn集群配置了任务恢复
> >> 疑问
> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >>
> >>
> >>
> >>



-- 
Best,
Yanfei


Re:Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread guanyq
目前也想着用savepoint处理异常停电的问题
但是我这面还有个疑问:
flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
就很奇怪是不是10个checkpoint都没落盘导致的。
想问下:
checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。

















在 2023-03-10 08:47:11,"Shammon FY"  写道:
>Hi
>
>我觉得Flink
>作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>
>Best,
>Shammon
>
>On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
>
>> 前提
>> 1.flink配置了高可用
>> 2.flink配置checkpoint数为10
>> 3.yarn集群配置了任务恢复
>> 疑问
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>>
>>
>>
>>


Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread Shammon FY
Hi

我觉得Flink
作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业

Best,
Shammon

On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:

> 前提
> 1.flink配置了高可用
> 2.flink配置checkpoint数为10
> 3.yarn集群配置了任务恢复
> 疑问
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>
>
>
>


flink on yarn 异常停电问题咨询

2023-03-09 Thread guanyq
前提
1.flink配置了高可用
2.flink配置checkpoint数为10
3.yarn集群配置了任务恢复
疑问
yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动





Re: Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-16 Thread Shammon FY
Hi

上面TM心跳出现unreachable,一般是TM退出了,可以看下退出原因
下面Checkpoint超时,可以看下是否出现反压等问题,也可以看checkpoint执行时间,考虑增加checkpoint超时时间

Best,
Shammon


On Thu, Feb 16, 2023 at 10:34 AM lxk  wrote:

> 你好,可以dump下内存分析
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-16 10:05:19,"Fei Han"  写道:
> >@all
> >大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
> >org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with
> id container_e506_1673750933366_49579_01_02(
> hdp-server-010.yigongpin.com:8041) is no longer reachable. at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> ~[?:1.8.0_181] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5]
> >在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint expired before completing. at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
> [flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。
> >请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。
>


Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-15 Thread Fei Han
@all
大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id 
container_e506_1673750933366_49579_01_02(hdp-server-010.yigongpin.com:8041) 
is no longer reachable. at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[?:1.8.0_181] at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[?:1.8.0_181] at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 ~[?:1.8.0_181] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5]
在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException:
 Checkpoint expired before completing. at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
 [flink-dist_2.12-1.14.5.jar:1.14.5] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_181] at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_181] at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_181] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_181] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。
请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。


Re: Deploy Flink on YARN or Kubernetes.

2022-12-20 Thread Biao Geng
Hi Ruibin,
I think it may be hard to say which provider is alway more recommended than
the other. The answer to your question depends heavily on your team's
technical stack, your platform and your expectations on the new cluster.
I *cannot* give you any advice but I just want share some observations of
myself and hope it can provide more information for the discussion:
1. stability: Flink on YARN module and Hadoop ecosystem have developed for
a longer period of time than Flink on K8S and K8S ecosystem. The codebase
of Flink on YARN module is more stable and it could be easier to get
relevant information about YARN provider. But in production environment,
Flink on K8S is also used by lots of companies and I believe these 2
providers are all capable of processing most flink workloads.
2. popularity: K8s, or "cloud native", is getting more popular.
3. community support: no obvious difference but it is worthwhile to mention
that flink kubernetes operator is in rapid development and maybe you can
give it a try.
4. toolchains: these providers all have their solutions to log collection /
monitoring with metrics / resource scaling.
In summary, IMO, currently, for Flink's usage, there are no obvious
advantages of one provider comparing with the other. I can see that most
middle sized users are still using YARN for their big data. But K8s may
make it easier to utilize your resources completely when you have other
workloads(e.g. spark batch jobs) which can reduce the cost. When FLIP-271
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling> of
Autoscaling is completed, maybe Flink on K8s will have some killer
advantage.

Best,
Biao Geng

Ruibin Xing  于2022年12月19日周一 14:54写道:

> Hi all,
>
> We are currently setting up a new Flink cluster and are trying to decide
> on the best deployment method. As far as we know, Flink supports two
> resource providers: YARN and Kubernetes. We are having difficulty
> evaluating the pros and cons of each provider, particularly in terms of
> stability, popularity, community support, and toolchains.
>
> Could someone please share their experience with these resource providers
> and provide a recommendation for our new cluster?
>
> Best, Ruibin
>
>


Re: Deploy Flink on YARN or Kubernetes.

2022-12-20 Thread Márton Balassi
Hi Ruibin,

Given that you are starting fresh I would recommend going with Kubernetes
and specifically checking out the Flink Kubernetes Operator. [1] I have
worked with Yarn for years before I transitioned to Kubernetes a year ago
and I am pleased that we made the jump. To address you point on a very high
level:

1. Stability: By now both Yarn and Kubernetes are quite stable.
2. Popularity: My educated guess is that Kubernetes is the more popular
choice by now.
3. Community support: Kubernetes wins here imho, both in terms of the
provider itself and in terms of the Flink integration. See my recent,
admittedly biased tweet thread. [2]
4. Toolchains: Hands down Kubernetes. One slight note is that traditionally
the Kubernetes toolchain is in Golang, which raises the barrier of entry
for Java/Scala folks. By now the Java toolchain for Kubernetes has grown
quite a bit though, we are using tools like fabric8 [3] and the Java
Operator SDK [4] extensively.

Honourable mention is that given the declarative REST APIs of Kubernetes
debugging applications is significantly easier than on Yarn.

[1] https://github.com/apache/flink-kubernetes-operator
[2] https://mobile.twitter.com/MartonBalassi/status/1603498076882214925
[3] https://github.com/fabric8io/kubernetes-client
[4] https://javaoperatorsdk.io/

On Mon, Dec 19, 2022 at 7:56 AM Ruibin Xing  wrote:

> Hi all,
>
> We are currently setting up a new Flink cluster and are trying to decide
> on the best deployment method. As far as we know, Flink supports two
> resource providers: YARN and Kubernetes. We are having difficulty
> evaluating the pros and cons of each provider, particularly in terms of
> stability, popularity, community support, and toolchains.
>
> Could someone please share their experience with these resource providers
> and provide a recommendation for our new cluster?
>
> Best, Ruibin
>
>


Deploy Flink on YARN or Kubernetes.

2022-12-18 Thread Ruibin Xing
Hi all,

We are currently setting up a new Flink cluster and are trying to decide on
the best deployment method. As far as we know, Flink supports two resource
providers: YARN and Kubernetes. We are having difficulty evaluating the
pros and cons of each provider, particularly in terms of stability,
popularity, community support, and toolchains.

Could someone please share their experience with these resource providers
and provide a recommendation for our new cluster?

Best, Ruibin


Re: flink on yarn 作业挂掉反复重启

2022-07-25 Thread Weihua Hu
可以检查下是不是 JobManager 内存不足被 OOM kill 了,如果有更多的日志也可以贴出来

Best,
Weihua


On Mon, Jul 18, 2022 at 8:41 PM SmileSmile  wrote:

> hi,all
> 遇到这种场景,flink on yarn,并行度3000的场景下,作业包含了多个agg操作,作业recover from checkpoint
> 或者savepoint必现无法恢复的情况,作业反复重启
> jm报错org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> RECEIVED S
> IGNAL 15: SIGTERM. Shutting down as requested.
>
> 请问有什么好的排查思路吗
>
>
>
>
>


Re: flink on yarn job always restart

2022-07-18 Thread SmileSmile
When I turn off zk's ha configuration  and do a fault walkthrough, yarn's 
resourceManager log comes up with the following message.

2022-07-18 23:42:53,633 WARN 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=hadoop   
OPERATION=Application Finished - Failed TARGET=RMAppManager RESULT=FAILURE  
DESCRIPTION=App failed with state: FAILED   PERMISSIONS=Application 
application_1658129937018_0030 failed 1 times (global limit =2; local limit is 
=1) due to AM Container for appattempt_1658129937018_0030_01 exited with  
exitCode: -104
Failing this attempt.Diagnostics: Container 
[pid=760348,containerID=container_e01_1658129937018_0030_01_01] is running 
beyond physical memory limits. Current usage: 80.1 GB of 80 GB physical memory 
used; 86.1 GB of 640 GB virtual memory used. Killing container.

My JM memory is set to 80G, it's hard to imagine an OOM for a component like JM 
that doesn't run business logic (job parallelism is 3000, with multiple agg 
operations and sinks)



 Replied Message 
| From | Geng Biao |
| Date | 07/18/2022 23:31 |
| To | SmileSmile |
| Cc | user |
| Subject | Re: flink on yarn job always restart |

The log shows that “Diagnostics Cluster entrypoint has been closed 
externally..” So are you trying to kill the YARN cluster entrypoint process 
directly in the terminal using “kill ”? If users want to kill a TM, they 
should go to the machine that the TM process resides and kill the TM process. 
Cluster entrypoint is the driver to launch the flink cluster on YARN, not JM or 
TM process.

The zk HA is for JM(i.e. starting a new JM when previous JM fails) and TM is 
managed by JM which, IIUC, does not directly interact with zk. It is possible 
that JM will be restarted repeated (check details in this doc ) due to wrong 
configuration but it may not be your case here.

 

Best,

Biao Geng

 

From: SmileSmile 
Date: Monday, July 18, 2022 at 11:08 PM
To: biaogeng7 
Cc: user 
Subject: Re: flink on yarn job always restart

Thanks for the reply, our scenario was a failure test to see if the job would 
recover on its own after killing a TM.
It turns out that the job gets a SIGNAL 15 hang during the switch from 
DEPLOYING to INITIALIZING. Because zk's ha appears to restart repeatedly

My confusion
1. why does it receive SIGNAL 15
2. is it because of some configuration? (e.g. deploy timeout causing kill?)



 Replied Message 

|

From

|

Geng Biao

|
|

Date

|

07/18/2022 22:36

|
|

To

|

SmileSmile、user

|
|

Cc

| |
|

Subject

|

Re: flink on yarn job always restart

|

Hi,

 

One possible direction is to check your YARN log or TM log to see if the YARN 
RM kills the TM for some reason(e.g. physical memory is over limit) and as a 
result, the JM will try to recover the TM repeatedly according to your restart 
strategy.

The snippet of JM logs you provide is usually not the root cause.

 

Best,
Biao Geng

 

From: SmileSmile 
Date: Monday, July 18, 2022 at 8:46 PM
To: user 
Subject: flink on yarn job always restart

hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error logorg.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
flink version 1.14.5
Have any good ideas for troubleshooting?







Re: flink on yarn job always restart

2022-07-18 Thread Geng Biao
The log shows that “Diagnostics Cluster entrypoint has been closed 
externally..” So are you trying to kill the YARN cluster entrypoint process 
directly in the terminal using “kill ”? If users want to kill a TM, they 
should go to the machine that the TM process resides and kill the TM process. 
Cluster entrypoint is the driver to launch the flink cluster on YARN, not JM or 
TM process.
The zk HA is for JM(i.e. starting a new JM when previous JM fails) and TM is 
managed by JM which, IIUC, does not directly interact with zk. It is possible 
that JM will be restarted repeated (check details in this 
doc<https://help.aliyun.com/document_detail/411149.html#section-cco-ygc-hfe> ) 
due to wrong configuration but it may not be your case here.

Best,
Biao Geng

From: SmileSmile 
Date: Monday, July 18, 2022 at 11:08 PM
To: biaogeng7 
Cc: user 
Subject: Re: flink on yarn job always restart
Thanks for the reply, our scenario was a failure test to see if the job would 
recover on its own after killing a TM.
It turns out that the job gets a SIGNAL 15 hang during the switch from 
DEPLOYING to INITIALIZING. Because zk's ha appears to restart repeatedly

My confusion
1. why does it receive SIGNAL 15
2. is it because of some configuration? (e.g. deploy timeout causing kill?)

 Replied Message 
From
Geng Biao<mailto:biaoge...@gmail.com>
Date
07/18/2022 22:36
To
SmileSmile<mailto:a511955...@163.com>、user<mailto:user@flink.apache.org>
Cc
Subject
Re: flink on yarn job always restart
Hi,

One possible direction is to check your YARN log or TM log to see if the YARN 
RM kills the TM for some reason(e.g. physical memory is over limit) and as a 
result, the JM will try to recover the TM repeatedly according to your restart 
strategy.
The snippet of JM logs you provide is usually not the root cause.

Best,
Biao Geng

From: SmileSmile mailto:a511955...@163.com>>
Date: Monday, July 18, 2022 at 8:46 PM
To: user mailto:user@flink.apache.org>>
Subject: flink on yarn job always restart
hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error logorg.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
flink version 1.14.5
Have any good ideas for troubleshooting?







Re: flink on yarn job always restart

2022-07-18 Thread SmileSmile
Thanks for the reply, our scenario was a failure test to see if the job would 
recover on its own after killing a TM.
It turns out that the job gets a SIGNAL 15 hang during the switch from 
DEPLOYING to INITIALIZING. Because zk's ha appears to restart repeatedly

My confusion
1. why does it receive SIGNAL 15
2. is it because of some configuration? (e.g. deploy timeout causing kill?)



 Replied Message 
| From | Geng Biao |
| Date | 07/18/2022 22:36 |
| To | SmileSmile、user |
| Cc | |
| Subject | Re: flink on yarn job always restart |

Hi,

 

One possible direction is to check your YARN log or TM log to see if the YARN 
RM kills the TM for some reason(e.g. physical memory is over limit) and as a 
result, the JM will try to recover the TM repeatedly according to your restart 
strategy.

The snippet of JM logs you provide is usually not the root cause.

 

Best,
Biao Geng

 

From: SmileSmile 
Date: Monday, July 18, 2022 at 8:46 PM
To: user 
Subject: flink on yarn job always restart

hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error logorg.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
flink version 1.14.5
Have any good ideas for troubleshooting?






Re: flink on yarn job always restart

2022-07-18 Thread Geng Biao
Hi,

One possible direction is to check your YARN log or TM log to see if the YARN 
RM kills the TM for some reason(e.g. physical memory is over limit) and as a 
result, the JM will try to recover the TM repeatedly according to your restart 
strategy.
The snippet of JM logs you provide is usually not the root cause.

Best,
Biao Geng

From: SmileSmile 
Date: Monday, July 18, 2022 at 8:46 PM
To: user 
Subject: flink on yarn job always restart
hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error logorg.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
flink version 1.14.5
Have any good ideas for troubleshooting?






Re: flink on yarn job always restart

2022-07-18 Thread SmileSmile
The previous logs were all job deployment logs, then suddenly JM received 
SIGNAL 15, and all components started to exit


2022-07-18 20:31:27,813 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - theOpeartion 
namex (839/3000) (f913468fb654c6d2c3466ef28d296396) switched from 
INITIALIZING to RUNNING.
2022-07-18 20:31:27,879 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
2022-07-18 20:31:27,879 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - theOpeartion 
namex (2093/3000) (0a85fc4259b8e99dee2e9761131cac51) switched from 
DEPLOYING to INITIALIZING.
2022-07-18 20:31:27,882 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting Y
arnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
Cluster entrypoint has been clos
ed externally..
2022-07-18 20:31:27,884 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
down rest endpoint.
2022-07-18 20:31:27,886 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BL
OB server at 0.0.0.0:45857
2022-07-18 20:31:27,988 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
cache directory /tmp/flink-web-3c48a98c-6642-40de-a52a-1794256c4495/flink-web-ui
2022-07-18 20:31:27,991 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - St
opping DefaultLeaderElectionService.
2022-07-18 20:31:27,991 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing 
ZooKeeperLeaderElectionDriver{leaderLatchPath='/leader/rest_server/latch', 
connectionInformationPath='/leader/rest_server/connection_info'}
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
complete.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing 
ZookeeperLeaderRetrievalDriver{connectionInformationPath='/leader/dispatcher/connection_info'}.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing 
ZookeeperLeaderRetrievalDriver{connectionInformationPath='/leader/resource_manager/connection_info'}.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2022-07-18 20:31:28,001 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing 
ZooKeeperLeaderElectionDriver{leaderLatchPath='/leader/dispatcher/latch', 
connectionInformationPath='/leader/dispatcher/connection_info'}
2022-07-18 20:31:28,020 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Stopping 
dispatcher akka.tcp://flink@ip:43728/user/rpc/dispatcher_0.

2022-07-18 20:31:28,023 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - 
Interrupted while waiting for queue
java.lang.InterruptedException: null
   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 ~[?:1.8.0_322]
   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 ~[?:1.8.0_322]
   at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
~[?:1.8.0_322]
   at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:323)
 [hadoop-yarn-client-2.8.5.jar:?]
2022-07-18 20:31:28,029 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 'flink_test' 
(7a9a02f6aa168abc927732855e3d230f).
2022-07-18 20:31:28,067 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - Job 
7a9a02f6aa168abc927732855e3d230f reached terminal state SUSPENDED.



 Replied Message 
| From | Zhanghao Chen |
| Date | 07/18/2022 21:19 |
| To | SmileSmile、user |
| Cc | |
| Subject | Re: flink on yarn job always restart |
Hi, could you provide the whole JM log?


Best,
Zhanghao Chen
From: SmileSmile 
Sent: Monday, July 18, 2022 20:46
To: user 
Subject: flink on yarn job always restart
 
hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error

Re: flink on yarn job always restart

2022-07-18 Thread Zhanghao Chen
Hi, could you provide the whole JM log?

Best,
Zhanghao Chen

From: SmileSmile 
Sent: Monday, July 18, 2022 20:46
To: user 
Subject: flink on yarn job always restart

hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error logorg.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
flink version 1.14.5
Have any good ideas for troubleshooting?







flink on yarn job always restart

2022-07-18 Thread SmileSmile
hi all
we meet a situation, parallelism 3000,the job contains multiple agg 
operation,the job recover from checkpoint or savepoint must be unrecoverable, 
the job restarts repeatedly
jm error logorg.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.
flink version 1.14.5
Have any good ideas for troubleshooting?







flink on yarn 作业挂掉反复重启

2022-07-18 Thread SmileSmile
hi,all
遇到这种场景,flink on yarn,并行度3000的场景下,作业包含了多个agg操作,作业recover from checkpoint 
或者savepoint必现无法恢复的情况,作业反复重启
jm报错org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.

请问有什么好的排查思路吗






Re: Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 Thread tison
@duwenwen 我比较好奇你的算子里做了什么,因为如果你就是要全局初始化唯一一次,那就用一个 parallelism=1 的算子去做就好了。

parallelism=n 你还要确保 once 的话应该得依赖外部系统来做到仅初始化一次。

Best,
tison.


Paul Lam  于2022年4月22日周五 18:16写道:

> 听起来是在 Flink 里启动 springboot? 很有意思的架构,有一点点类似 statefun 了。可以说说这么做的背景吗?
>
> 另外请附带上 flink 的部署模式和版本信息,这样大家才好判断问题在哪里。
>
> Best,
> Paul Lam
>
> > 2022年4月22日 16:30,duwenwen  写道:
> >
> > 您好:
> >首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
> 由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
> 或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。
>
>


Re: Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 Thread Paul Lam
听起来是在 Flink 里启动 springboot? 很有意思的架构,有一点点类似 statefun 了。可以说说这么做的背景吗?

另外请附带上 flink 的部署模式和版本信息,这样大家才好判断问题在哪里。

Best,
Paul Lam

> 2022年4月22日 16:30,duwenwen  写道:
> 
> 您好:
>首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
> 由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
>  或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。



Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 Thread duwenwen
您好:
首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
 或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。

Re: flink on yarn任务停止发生异常

2022-03-08 Thread Jiangang Liu
异常提示的很明确了,做savepoint的过程中有的task不在running状态,你可以看下你的作业是否发生了failover。

QiZhu Chan  于2022年3月8日周二 17:37写道:

> Hi,
>
> 各位社区大佬们,帮忙看一下如下报错是什么原因造成的?正常情况下客户端日志应该返回一个savepoint路径,但却出现如下异常日志,同时作业已被停止并且查看hdfs有发现当前job产生的savepoint文件
>
>
>
>
>


回复:flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 Thread xieyi
再补充一个信息:
故障案例中:
flink 客户端flink_conf.ymal 中正确配置了security.kerberos.login.keytab。




在2022年02月11日 15:18,xieyi 写道:


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的1377这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(1377)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?



flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 Thread xieyi


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的1377这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(1377)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?



flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 Thread xieyi


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的13770506这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(13770506)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?





Re:Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 Thread casel.chen
如果是两套oss或s3 
bucket(每个bucket对应一组accessKey/secret)要怎么配置呢?例如写数据到bucketA,但checkpoint在bucketB

















在 2021-12-06 18:59:46,"Yang Wang"  写道:
>我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式
>
>-yt /path/of/my-hadoop-conf
>-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>
>
>Best,
>Yang
>
>chenqizhu  于2021年11月30日周二 上午10:00写道:
>
>> all,您好:
>>
>>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
>> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>>
>>
>>
>>
>> flink.hadoop.dfs.nameservices: ACluster,BCluster
>>
>> flink.hadoop.fs.defaultFS: hdfs://BCluster
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>>
>> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
>> hdfs://ACluster)
>>
>>
>>
>>
>> Caused by: BCluster
>>
>> java.net.UnknownHostException: BCluster
>>
>> at
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>>
>> at
>> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>>
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>>
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>>
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>>
>> at
>> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>>
>>
>>
>>
>> 我的组件版本:
>>
>> flink : 1.13.3
>>
>> hadoop : 3.3.0
>>
>>
>>
>>
>> 期待回复,感谢!


Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 Thread Yang Wang
我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式

-yt /path/of/my-hadoop-conf
-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'


Best,
Yang

chenqizhu  于2021年11月30日周二 上午10:00写道:

> all,您好:
>
>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>
>
>
>
> flink.hadoop.dfs.nameservices: ACluster,BCluster
>
> flink.hadoop.fs.defaultFS: hdfs://BCluster
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>
> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
> hdfs://ACluster)
>
>
>
>
> Caused by: BCluster
>
> java.net.UnknownHostException: BCluster
>
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>
> at
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>
>
>
>
> 我的组件版本:
>
> flink : 1.13.3
>
> hadoop : 3.3.0
>
>
>
>
> 期待回复,感谢!


关于flink on yarn 跨多hdfs集群访问的问题

2021-11-29 Thread chenqizhu
all,您好:

 flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式 
配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:




flink.hadoop.dfs.nameservices: ACluster,BCluster

flink.hadoop.fs.defaultFS: hdfs://BCluster




flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2

flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000

flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070

flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070

flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider




flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2

flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070

flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070

flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider




但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:

(将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS: 
hdfs://ACluster)




Caused by: BCluster

java.net.UnknownHostException: BCluster

at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)

at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)

at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)

at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)

at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)

at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)

at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)




对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。




我的组件版本:

flink : 1.13.3

hadoop : 3.3.0




期待回复,感谢!

关于flink on yarn 跨多hdfs集群访问的问题

2021-11-29 Thread chenqizhu
all,您好:
 flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式 
配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:


flink.hadoop.dfs.nameservices: ACluster,BCluster
flink.hadoop.fs.defaultFS: hdfs://BCluster


flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
(将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS: 
hdfs://ACluster)


Caused by: BCluster
java.net.UnknownHostException: BCluster
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。


我的组件版本:
flink : 1.13.3
hadoop : 3.3.0











??????flink on yarn ??pre_job????????,????session????????????

2021-11-04 Thread JasonLee
hi


?? jar ??Flink ??


Best
JasonLee
??2021??11??4?? 18:41<2572805...@qq.com.INVALID> ??
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;   at 
java.lang.ClassLoader.defineClass1(Native Method)   at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)   at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)   at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467)   at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)   at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
   at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
   at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from container-launch.   
Container id: container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at org.apache.hadoop.util.Shell.run(Shell.java:848)   
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)   

at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at java.lang.Thread.run(Thread.java:748)   


Container exited with a non-zero exit code 1   
Failing this attempt. Failing the application.   

??




Re: flink on yarn 的pre_job提交失败,但是session模式可以成功

2021-11-04 Thread 刘建刚
通过你上面的信息是看不出来的,里头的链接你可以看下详细日志
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月4日周四 下午6:29写道:

> yarn的错误日志:
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.   at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
>  at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
> Caused by: java.lang.VerifyError: class
> org.apache.flink.yarn.YarnResourceManager overrides final method
> onStop.()Ljava/util/concurrent/CompletableFuture;  at
> java.lang.ClassLoader.defineClass1(Native Method)at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at
> java.security.AccessController.doPrivileged(Native Method)   at
> java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at
> java.lang.ClassLoader.loadClass(ClassLoader.java:424)at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at
> java.lang.ClassLoader.loadClass(ClassLoader.java:357)at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
>at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> at java.security.AccessController.doPrivileged(Native Method)   at
> javax.security.auth.Subject.doAs(Subject.java:422)   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
>at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
>  ... 2 common frames omitted
>
> Diagnostics:
> Application application_1635998548270_0028 failed 1  times
> (global limit =2; local limit is =1) due to AM Container for
> appattempt_1635998548270_0028_01 exited with  exitCode: 1
>
> For more detailed output, check
> the application  tracking page:
> http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028
> Then click on links to logs of each attempt.
> Diagnostics: Exception from
> container-launch.
> Container id:
> container_e391_1635998548270_0028_01_01
> Exit code: 1
> Stack trace: ExitCodeException
> exitCode=1:
> at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:944)
> at
> org.apache.hadoop.util.Shell.run(Shell.java:848)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
>
> at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at
> java.lang.Thread.run(Thread.java:748)
>
>
> Container exited with a non-zero
> exit code 1
> Failing this attempt. Failing the
> application.
>
> 陈
>
>
> 


flink on yarn ??pre_job????????,????session????????????

2021-11-04 Thread ??????
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;  at 
java.lang.ClassLoader.defineClass1(Native Method)at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)  at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
  at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
  at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
 at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
  at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the 
application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from 
container-launch.   
Container id: 
container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException 
exitCode=1:
at 
org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at 
org.apache.hadoop.util.Shell.run(Shell.java:848)   
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)  
 
at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at 
java.lang.Thread.run(Thread.java:748)   
   
   

flink 1.13.1 ????yarn-application????????????????mysql??????????????hive??????????????16G+??Taskmangaer????

2021-11-04 Thread Asahi Lee
hi!
??flink 
sqlmysql??hive??yarn-application??16G??

回复:Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

2021-08-31 Thread JasonLee
Hi


可以参考这两篇文章:
https://mp.weixin.qq.com/s/2S4M8p-rBRinIRxmZrZq5Q 
https://mp.weixin.qq.com/s/44SXmCAUOqSWhQrNiZftoQ


Best
JasonLee


在2021年08月31日 13:23,guanyq 写道:
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

2021-08-30 Thread guanyq
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

RE: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-20 Thread Hailu, Andreas [Engineering]
Hi David, I was able to get this working using your suggestion:


1)Deploy a Flink YARN Session Cluster, noting the host + port of the 
session’s Job Manager.

2)Submit a Flink job using the session’s details, i.e submitting Flink job 
with ‘-m host:port’ option.

Thanks for clearing things up.

// ah

From: David Morávek 
Sent: Tuesday, August 17, 2021 4:37 AM
To: Hailu, Andreas [Engineering] 
Cc: Ravichandran, Soorya Prasanna [Engineering] 
; user@flink.apache.org
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

the problem here is that the command you're using is starting a per-job cluster 
(which is obvious from the used deployment method 
"YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster` 
flag is deprecated and no longer supported, I think this is something we should 
completely remove in the near future. Also this was always supposed to start 
your job in per-job mode, but unfortunately in older versions this was kind of 
simulated using session cluster, so I'd say it has just worked by an accident 
(a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than use a 
`yarn-session` deployment target (where you need to provide yarn application id 
so Flink can search for the active JobManager). This is well documented in the 
yarn section of the 
docs<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode=DwMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80=>
 [1].

Can you please try this approach a let me know if that helped?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode=DwMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80=>

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] 
mailto:andreas.ha...@gs.com>> wrote:
Hi David,

You’re correct about classpathing problems – thanks for your help in spotting 
them. I was able to get past that exception by removing some conflicting 
packages in my shaded JAR, but I’m seeing something else that’s interesting. 
With the 2 threads trying to submit jobs, one of the threads is able submit and 
process data successfully, while the other thread fails.

Log snippet:
2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56247060 for delp)
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain Kerberos 
security token for HBase
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available (not 
packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job graph 
to local resource fail.
2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor - Submitting 
application master application_1628992879699_11275
2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running data 
flow for thread-2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExe

Re: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-17 Thread David Morávek
mpl - Timeline
> service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted
> application application_1628992879699_11275
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting
> for the cluster to be allocated
>
> 2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying
> cluster, current state ACCEPTED
>
> 2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN
> application has been deployed successfully.
>
> 2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web
> Interface d279536-023.dc.gs.com:41019 of application
> 'application_1628992879699_11275'.
>
> 2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job
> has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
>
> Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
>
> 2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2
> - job completed for thread-2 with parallelism 1
>
> Program execution finished
>
> Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.
>
>
>
> I’ve generated and sent you a signup link to our firm’s secure
> document-sharing app called Lockbox. In the repository, I’ve uploaded both
> our full client and YARN app logs (named half_failure-client_log and
> half_failure-yarn-log, respectively) in a directory named Flink support
> logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to
> have a look at see if you can see if there’s something we’re doing that’s
> clearly wrong?
>
>
>
> Something I did notice is that with the upgrade, our submissions are now
> using the introduction of this ContextEnvironment#executeAsync method. If
> it means anything, our client doesn’t require asynchronous job submission.
>
> *// *ah
>
>
>
> *From:* David Morávek 
> *Sent:* Monday, August 16, 2021 6:28 AM
> *To:* Hailu, Andreas [Engineering] 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Upgrading from Flink on YARN 1.9 to 1.11
>
>
>
> Hi Andreas,
>
>
>
> Per-job and session deployment modes should not be affected by this FLIP.
> Application mode is just a new deployment mode (where job driver runs
> embedded within JM), that co-exists with these two.
>
>
>
> From information you've provided, I'd say your actual problem is this
> exception:
>
>
>
> ```
>
> Caused by: java.lang.ExceptionInInitializerError
>
> at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>
> at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>
> at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>
> at com.sun.jersey.api.client.Client.init(Client.java:342)
>
> at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>
> at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>
> at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>
> at
> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>
> at com.sun.jersey.api.client.Client.(Client.java:187)
>
>at com.sun.jersey.api.client.Client.(Client.java:170)
>
> at
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
> ```
>
>
>
> I've seen this exception a few times with Hadoop already and it's usually
> a dependency / class-path problem. If you google for this you'll find many
> references.
>
>
>
> Best,
>
> D.
>
>
>
>
>
> On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <
> andreas.ha...@gs.com> wrote:
>
> Hello folks!
>
>
>
> We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on
> YARN and each have their own clusters, with each application having
> multiple jobs submitted.
>
>
>
> Our current submission command looks like this:
>
> $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name
> -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar
> -application-args-go-here
>
>
>
> The behavior observed in versions <= 1.9 is the following:
>
> 1. A Flink cluster gets deployed to YARN
>
> 2. Our application code is run, building graphs and submitting jobs
>
>
>
> When we rebuilt and submit using 1.11.2, we now observe the following:
>
> 1. Our application code is run, building graph and submitting jobs
>
> 2. A Flink cluster gets deployed to YARN once execute()

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 Thread 周瑞
您好,我的版本是1.13.1


--Original--
From: "Yang Wang"https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞 

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 Thread Yang Wang
看报错应该是个已知问题[1]并且已经在1.11.2中修复

[1]. https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞  于2021年8月17日周二 上午11:04写道:

> 您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA
> 模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.
>
>
> HA 配置如下:
> high-availability: zookeeper high-availability.storageDir:
> hdfs://mycluster/flink/ha high-availability.zookeeper.quorum:
> zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: /flink_cluster
>
>
> 异常如下:
> 2021-08-17 10:24:18,938 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
> 2021-08-17 10:25:09,706 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
> serialize the result for RPC call : requestTaskManagerDetailsInfo.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
> ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_292]
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_292]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 

Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-16 Thread 周瑞
您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA 
模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.


HA 配置如下:
high-availability: zookeeper high-availability.storageDir: 
hdfs://mycluster/flink/ha high-availability.zookeeper.quorum: 
zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink 
high-availability.cluster-id: /flink_cluster


异常如下:
2021-08-17 10:24:18,938 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
2021-08-17 10:25:09,706 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] 
- Unhandled exception.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
serialize the result for RPC call : requestTaskManagerDetailsInfo.
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
 ~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_292]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: java.io.NotSerializableException: 
org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
~[?:1.8.0_292]
at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
~[?:1.8.0_292]
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:387)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 29 more

RE: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-16 Thread Hailu, Andreas [Engineering]
f it means 
anything, our client doesn’t require asynchronous job submission.

// ah

From: David Morávek 
Sent: Monday, August 16, 2021 6:28 AM
To: Hailu, Andreas [Engineering] 
Cc: user@flink.apache.org
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP. 
Application mode is just a new deployment mode (where job driver runs embedded 
within JM), that co-exists with these two.

From information you've provided, I'd say your actual problem is this exception:

```
Caused by: java.lang.ExceptionInInitializerError
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
at com.sun.jersey.api.client.Client.init(Client.java:342)
at com.sun.jersey.api.client.Client.access$000(Client.java:118)
at com.sun.jersey.api.client.Client$1.f(Client.java:191)
at com.sun.jersey.api.client.Client$1.f(Client.java:187)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
at com.sun.jersey.api.client.Client.(Client.java:187)
   at com.sun.jersey.api.client.Client.(Client.java:170)
at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```

I've seen this exception a few times with Hadoop already and it's usually a 
dependency / class-path problem. If you google for this you'll find many 
references.

Best,
D.


On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] 
mailto:andreas.ha...@gs.com>> wrote:
Hello folks!

We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN 
and each have their own clusters, with each application having multiple jobs 
submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm 
app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar 
-application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1. A Flink cluster gets deployed to YARN

2. Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1. Our application code is run, building graph and submitting jobs

2. A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is 
multi-threaded, and each thread submits its own job to the Flink cluster. What 
we see is the first thread to peexecute() submits a job to YARN, and others 
fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain 
Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available 
(not packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - 
Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception 
running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(Y

Re: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-16 Thread David Morávek
Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP.
Application mode is just a new deployment mode (where job driver runs
embedded within JM), that co-exists with these two.

>From information you've provided, I'd say your actual problem is this
exception:

```

Caused by: java.lang.ExceptionInInitializerError

at
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)

at
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)

at
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)

at com.sun.jersey.api.client.Client.init(Client.java:342)

at com.sun.jersey.api.client.Client.access$000(Client.java:118)

at com.sun.jersey.api.client.Client$1.f(Client.java:191)

at com.sun.jersey.api.client.Client$1.f(Client.java:187)

at
com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)

at com.sun.jersey.api.client.Client.(Client.java:187)

   at com.sun.jersey.api.client.Client.(Client.java:170)

at
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```


I've seen this exception a few times with Hadoop already and it's usually a
dependency / class-path problem. If you google for this you'll find many
references.


Best,

D.



On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hello folks!
>
>
>
> We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on
> YARN and each have their own clusters, with each application having
> multiple jobs submitted.
>
>
>
> Our current submission command looks like this:
>
> $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name
> -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar
> -application-args-go-here
>
>
>
> The behavior observed in versions <= 1.9 is the following:
>
> 1. A Flink cluster gets deployed to YARN
>
> 2. Our application code is run, building graphs and submitting jobs
>
>
>
> When we rebuilt and submit using 1.11.2, we now observe the following:
>
> 1. Our application code is run, building graph and submitting jobs
>
> 2. A Flink cluster gets deployed to YARN once execute() is invoked
>
>
>
> I presume that this is a result of FLIP-85 [1] ?
>
>
>
> This change in behavior proves to be a problem for us as our application
> is multi-threaded, and each thread submits its own job to the Flink
> cluster. What we see is the first thread to peexecute() submits a job to
> YARN, and others fail with a ClusterDeploymentException.
>
>
>
> 2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> Listening for transport dt_socket at address: 5005
>
> 2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Adding delegation token to the AM container.
>
> 2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to
> obtain Kerberos security token for HBase
>
> 2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not
> available (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628393898291_71530
>
> 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception
> running data flow for flink-thread-#2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
> at
> 

Upgrading from Flink on YARN 1.9 to 1.11

2021-08-13 Thread Hailu, Andreas [Engineering]
Hello folks!

We're looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN 
and each have their own clusters, with each application having multiple jobs 
submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm 
app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar 
-application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1. A Flink cluster gets deployed to YARN

2. Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1. Our application code is run, building graph and submitting jobs

2. A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is 
multi-threaded, and each thread submits its own job to the Flink cluster. What 
we see is the first thread to peexecute() submits a job to YARN, and others 
fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain 
Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available 
(not packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - 
Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception 
running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
...
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
...
Caused by: java.lang.ExceptionInInitializerError
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
at com.sun.jersey.api.client.Client.init(Client.java:342)
at com.sun.jersey.api.client.Client.access$000(Client.java:118)
at com.sun.jersey.api.client.Client$1.f(Client.java:191)
at com.sun.jersey.api.client.Client$1.f(Client.java:187)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
at com.sun.jersey.api.client.Client.(Client.java:187)
   at com.sun.jersey.api.client.Client.(Client.java:170)
at 

flink on yarn报错

2021-07-30 Thread wangjingen
有没有大佬帮忙看看这个问题
The RMClient's and  YarnResourceManagers internal state about the 
number of pending container requests for resource has 
diverged .Number client's pending container requests 1 !=Number RM's pending 
container requests 0;

flink on yarn??????????log4j????

2021-07-22 Thread comsir
hi all
flink??log4jlog4j
??
 ??

Flink on yarn-cluster模式提交任务报错

2021-06-08 Thread maker_d...@foxmail.com
我在CDH集群上使用Flink on yarn-cluster模式提交任务,报错不能部署,找不到jar包。
这个jar包是我没有用到的,但是在flink的lib中是存在的,并且我已经将lib的目录添加到环境变量中:
export HADOOP_CLASSPATH=/opt/cloudera/parcels/FLINK/lib/flink/lib


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not deploy Yarn job cluster.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1053)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1623148752688_0041 failed 1 
times (global limit =2; local limit is =1) due to AM Container for 
appattempt_1623148752688_0041_01 exited with  exitCode: -1000
Failing this attempt.Diagnostics: [2021-06-08 18:56:14.062]File 
file:/root/.flink/application_1623148752688_0041/lib/flink-table-blink_2.12-1.12.4.jar
 does not exist
java.io.FileNotFoundException: File 
file:/root/.flink/application_1623148752688_0041/lib/flink-table-blink_2.12-1.12.4.jar
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
at 
org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call

回复:flink on yarn日志清理

2021-06-07 Thread 王刚
你可以在客户端的log4j.properties或者logback.xml文件上配置下相关清理策略

你先确认下使用的哪个slf4j的实现类
 原始邮件
发件人: zjfpla...@hotmail.com
收件人: user-zh
发送时间: 2021年6月7日(周一) 12:17
主题: flink on yarn日志清理


大家好,
请问下如下问题:
flink on yarn模式,日志清理机制有没有的?
是不是也是按照log4j/logback/log4j2等的清理机制来的?还是yarn上配置的。
是实时流作业,非离线一次性作业,一直跑着的



zjfpla...@hotmail.com<mailto:zjfpla...@hotmail.com>



flink on yarn日志清理

2021-06-06 Thread zjfpla...@hotmail.com
大家好,
请问下如下问题:
flink on yarn模式,日志清理机制有没有的?
是不是也是按照log4j/logback/log4j2等的清理机制来的?还是yarn上配置的。
是实时流作业,非离线一次性作业,一直跑着的



zjfpla...@hotmail.com


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-31 Thread Yang Wang
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。


Best,
Yang

刘建刚  于2021年5月28日周五 下午6:51写道:

> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-05-28 18:15:38,"刘建刚"  写道:
> > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> > >1、从savepoint恢复;
> > >2、作业开始定期做savepoint;
> > >3、作业failover。
> > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> > >如果还是有问题,需要通过日志来排查了。
> > >
> > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> > >
> > >> 我遇到的问题现象是这样的
> > >>
> > >>
> > >>
> > >>
> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> > >>
> > >>
> > >>
> > >>
> > >> flink run -d -s
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> > >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> > >>
> > >>
> > >>
> > >>
> > >> 2、flink-conf.xml
> > >>
> > >>
> > >>
> > >>
> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> > >>
> > >>
> > >>
> > >>
> > >> 3、代码checkpoint设置
> > >>
> > >>
> > >>
> > >>
> > >>StreamExecutionEnvironment env =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>
> > >>
> > >>
> > >>
> > >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> > >> 10));
> > >>
> > >>
> > >>
> > >>
> > >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >>
> > >>
> > >>
> > >>
> > >>env.enableCheckpointing(1 * 60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> > >>
> > >>
> > >>
> > >>
> > >> 4、问题现象
> > >>
> > >>
> > >>
> > >>
> > >> a)运维同事切换yarn
> > >>
> >
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> > >>
> > >>
> > >>
> > >>
> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> > >> restore,从日志中看还是从chk-100 restore的。
> > >>
> > >>
> > >>
> > >>
> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> > >> sourceMilApplysLogStream = MySQLSource.builder()
> > >>
> > >>
> > >>
> > >>
> > >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> > >>
> > >>
> > >>
> > >>
> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> > >>
> > >>
> > >>
> > >>
> > >> 2021-05-24 16:49:50,398 INFO
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >>
> > >>
> > >>
> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> > >>
> > >>
> > >>
> > >>
> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
> >
>


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 Thread 刘建刚
那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。

董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:

> 稳定复现
> checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> >> org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-05-28 18:15:38,"刘建刚"  写道:
> >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> >1、从savepoint恢复;
> >2、作业开始定期做savepoint;
> >3、作业failover。
> >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> >如果还是有问题,需要通过日志来排查了。
> >
> >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> >
> >> 我遇到的问题现象是这样的
> >>
> >>
> >>
> >>
> >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> >>
> >>
> >>
> >>
> >> flink run -d -s
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> >>
> >>
> >>
> >>
> >> 2、flink-conf.xml
> >>
> >>
> >>
> >>
> >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> >>
> >>
> >>
> >>
> >> 3、代码checkpoint设置
> >>
> >>
> >>
> >>
> >>StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>
> >>
> >>
> >>
> >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> >> 10));
> >>
> >>
> >>
> >>
> >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> >>
> >>
> >>
> >>
> >>
> >>
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >>env.enableCheckpointing(1 * 60 * 1000);
> >>
> >>
> >>
> >>
> >>
> >>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> >>
> >>
> >>
> >>
> >> 4、问题现象
> >>
> >>
> >>
> >>
> >> a)运维同事切换yarn
> >>
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> >>
> >>
> >>
> >>
> >>
> >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> >>
> >>
> >>
> >>
> >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> >> restore,从日志中看还是从chk-100 restore的。
> >>
> >>
> >>
> >>
> >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> >> sourceMilApplysLogStream = MySQLSource.builder()
> >>
> >>
> >>
> >>
> >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> >>
> >>
> >>
> >>
> >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> >>
> >>
> >>
> >>
> >> 2021-05-24 16:49:50,398 INFO
> >> org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >>
> >>
> >>
> >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> >>
> >>
> >>
> >>
> >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
>


Re:Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 Thread 董建
稳定复现
checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
我们jobmanager没有做ha,不知道是否是这个原因导致的?
日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
>> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100














在 2021-05-28 18:15:38,"刘建刚"  写道:
>这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
>1、从savepoint恢复;
>2、作业开始定期做savepoint;
>3、作业failover。
>如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
>如果还是有问题,需要通过日志来排查了。
>
>董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
>
>> 我遇到的问题现象是这样的
>>
>>
>>
>>
>> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>>
>>
>>
>>
>> flink run -d -s
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
>> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>>
>>
>>
>>
>> 2、flink-conf.xml
>>
>>
>>
>>
>> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>>
>>
>>
>>
>> 3、代码checkpoint设置
>>
>>
>>
>>
>>StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>>
>>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
>> 10));
>>
>>
>>
>>
>>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>
>>
>>
>>
>>
>>  
>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>>env.enableCheckpointing(1 * 60 * 1000);
>>
>>
>>
>>
>>
>>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>>
>>
>>
>>checkpointConfig.setTolerableCheckpointFailureNumber(100);
>>
>>
>>
>>
>>checkpointConfig.setCheckpointTimeout(60 * 1000);
>>
>>
>>
>>
>>checkpointConfig.setMaxConcurrentCheckpoints(1);
>>
>>
>>
>>
>> 4、问题现象
>>
>>
>>
>>
>> a)运维同事切换yarn
>> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>>
>>
>>
>>
>>
>> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>>
>>
>>
>>
>> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
>> restore,从日志中看还是从chk-100 restore的。
>>
>>
>>
>>
>> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
>> sourceMilApplysLogStream = MySQLSource.builder()
>>
>>
>>
>>
>>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>>
>>
>>
>>
>> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>>
>>
>>
>>
>> 2021-05-24 16:49:50,398 INFO
>> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>>
>>
>>
>> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>>
>>
>>
>>
>> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。


Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 Thread 刘建刚
这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
1、从savepoint恢复;
2、作业开始定期做savepoint;
3、作业failover。
如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
如果还是有问题,需要通过日志来排查了。

董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:

> 我遇到的问题现象是这样的
>
>
>
>
> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>
>
>
>
> flink run -d -s
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>
>
>
>
> 2、flink-conf.xml
>
>
>
>
> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>
>
>
>
> 3、代码checkpoint设置
>
>
>
>
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> 10));
>
>
>
>
>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>
>
>
>
>
>  
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
>env.enableCheckpointing(1 * 60 * 1000);
>
>
>
>
>
>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>
>
>
>checkpointConfig.setTolerableCheckpointFailureNumber(100);
>
>
>
>
>checkpointConfig.setCheckpointTimeout(60 * 1000);
>
>
>
>
>checkpointConfig.setMaxConcurrentCheckpoints(1);
>
>
>
>
> 4、问题现象
>
>
>
>
> a)运维同事切换yarn
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>
>
>
>
>
> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>
>
>
>
> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> restore,从日志中看还是从chk-100 restore的。
>
>
>
>
> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> sourceMilApplysLogStream = MySQLSource.builder()
>
>
>
>
>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>
>
>
>
> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>
>
>
>
> 2021-05-24 16:49:50,398 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.savepoint.path,
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>
>
>
>
> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。


flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 Thread 董建
我遇到的问题现象是这样的




1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。




flink run -d -s 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 
-t yarn-per-job -m yarn-cluser -D yarn.application.name= 
/tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod




2、flink-conf.xml




state.checkpoints.dir: hdfs:///user/flink/checkpoints/default




3、代码checkpoint设置




   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();




   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10));




   CheckpointConfig checkpointConfig = env.getCheckpointConfig();




   
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




   env.enableCheckpointing(1 * 60 * 1000);




   checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);




   checkpointConfig.setTolerableCheckpointFailureNumber(100);




   checkpointConfig.setCheckpointTimeout(60 * 1000);




   checkpointConfig.setMaxConcurrentCheckpoints(1);




4、问题现象




a)运维同事切换yarn 
resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器




  b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200




c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 
restore的。




d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction 
sourceMilApplysLogStream = MySQLSource.builder()




  重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费




e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?




2021-05-24 16:49:50,398 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.savepoint.path, 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100



预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费




现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。

flink on yarn更新文件后重启失败

2021-04-30 Thread zjfpla...@hotmail.com
flink任务停止后,将相关配置文件进行更新(keytab),然后报错:
Resource hdfs://nameservice1/user/hbase/.flink/${appid}/hbase.keytab changed on 
src filesystem(excepted ,was )



zjfpla...@hotmail.com


flink on yarn kerberos认证问题

2021-04-30 Thread zjfpla...@hotmail.com
大家好,
 问题点:
   
1.cdh中kerberos已经被cm托管的情况下,cm中修改kerberos配置,/var/kerberos/krb5kdc/kdc.conf和/etc/krb5.conf都不变,好像是存在其他位置,这个有没有人清楚?
2.flink 1.8 on cdh5.14 yarn运行时,一天后报GSS initiate failed{caused by 
GSSException:No valid credentials 
provided}的报错,然后程序失败终止,怀疑是kerberos票据renew失效了,是不是我有哪里没配对,还是flink1.8还不支持renew?原先用spark-submit
 --keytab就会自动renew tgt

   服务器端的相关部分配置:
1.flink-conf.yaml:
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/zjf/zjf.keytab
security.kerberos.login.principal: z...@test.com
security.kerberos.login.contexts: Client,KafkaClient
zookeeper.sasl.service-name: zookeeper
zookeeper.sasl.login-context-name: Client
2./var/kerberos/krb5kdc/kdc.conf:

3./etc/krb5.conf:
其中的max_renewable_life是后面手动加的,kdc已重启过还是无效。

然后查看kerberos debug的日志, 发现有如下日志段:
Forwardable Ticket true
Forwarded Ticket false
Proxiable Ticket false
Proxy Ticket false 
Postdated Ticket false
Renewable Ticket false
Initial Ticket false
Auth Time =Fri Apr 30 14:38:36 CST 2021
Start Time =Fri Apr 30 14:38:36 CST 2021
End Time =Sat May 01 14:38:36 CST 2021  
Renew Till = null   




zjfpla...@hotmail.com
 


flink在yarn集群上启动的问题

2021-04-21 Thread tanggen...@163.com
您好,我在向yarn 集群提交flink任务时遇到了一些问题,希望能帮忙回答一下
我布署了一个三个节点hadoop集群,两个工作节点为4c24G,yarn-site中配置了8个vcore,可用内存为20G,总共是16vcore 
40G的资源,现在我向yarn提交了两个任务,分别分配了3vcore,6G内存,共消耗6vcore,12G内存,从hadoop的web 
ui上也能反映这一点,如下图:
但是当我提交第三个任务时,却无法提交成功,没有明显的报错日志,可是整个集群的资源明显是充足的,所以不知道问题是出现在哪里,还请多多指教
附1(控制台输出):
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not deploy Yarn job cluster.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at 
com.hfhj.akb.report.order.PlatformOrderStream.main(PlatformOrderStream.java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1618931441017_0004 failed 2 
times in previous 1 milliseconds (global limit =4; local limit is =2) due 
to AM Container for appattempt_1618931441017_0004_03 exited with  exitCode: 
1
Failing this attempt.Diagnostics: [2021-04-20 23:34:16.067]Exception from 
container-launch.
Container id: container_1618931441017_0004_03_01
Exit code: 1

[2021-04-20 23:34:16.069]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

[2021-04-20 23:34:16.069]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

For more detailed output, check the application tracking page: 
http://master107:8088/cluster/app/application_1618931441017_0004 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1618931441017_0004
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
... 22 more
附2(hadoop日志):
2021-04-20 23:31:40,293 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Stopping resource-monitoring for container_1618931441017_0004_01_01
2021-04-20 23:31:40,293 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got 
event CONTAINER_STOP for appId application_1618931441017_0004
2021-04-20 23:31:41,297 INFO 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Removed 
completed containers from NM context: [container_1618931441017_0004_01_01]
2021-04-20 23:34:12,558 INFO SecurityLogger.org.apache.hadoop.ipc.Server: Auth 
successful for appattempt_1618931441017_0004_03 (auth:SIMPLE)
2021-04-20 23:34:12,565 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 

flink on yarn 启动报错

2021-04-18 Thread Bruce Zhang
flink on yarn per-job 模式提交报错,命令是 bin/flink run -m yarn-cluster -d -yjm 1024 
-ytm 4096 /home/XX.jar

 

 yarn 资源足够,提交别的程序也可以,只有这个程序提交就报错,但是命令修改为bin/flink run -m yarn-cluster -yjm 1024 
-ytm 4096 /home/testjar/XX.jar 就能成功,即去掉-d 这个命令参数,但是是session模式,并且还会影响别的程序执行

 

报错信息:

2021-04-19 10:08:13,116 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2021-04-19 10:08:13,116 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2021-04-19 10:08:13,541 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
specification: ClusterSpecification{masterMemoryMB=1024, 
taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}

2021-04-19 10:08:13,843 WARN  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
configuration directory ('/home/software/flink-1.7.0/conf') contains both LOG4J 
and Logback configuration files. Please delete or rename one of them.

2021-04-19 10:08:14,769 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting 
application master application_1618796268543_0019

2021-04-19 10:08:14,789 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1618796268543_0019

2021-04-19 10:08:14,789 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for the 
cluster to be allocated

2021-04-19 10:08:14,791 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
cluster, current state ACCEPTED






 The program finished with the following exception:




org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.

at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)

at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment.

Diagnostics from YARN: Application application_1618796268543_0019 failed 1 
times due to AM Container for appattempt_1618796268543_0019_01 exited with  
exitCode: 1

For more detailed output, check application tracking 
page:http://siact-11:8088/cluster/app/application_1618796268543_0019Then, click 
on links to logs of each attempt.

Diagnostics: Exception from container-launch.

Container id: container_e24_1618796268543_0019_01_01

Exit code: 1

Stack trace: ExitCodeException exitCode=1:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)

at org.apache.hadoop.util.Shell.run(Shell.java:482)

at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)

at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)







Container exited with a non-zero exit code 1

Failing this attempt. Failing the application.

If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:

yarn logs -applicationId application_1618796268543_0019

at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)

at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545

Re: flink on yarn 多TaskManager 拒绝连接问题

2021-04-12 Thread haihua
hi请问楼主这个问题解决了 ,有什么思路可以分享一下吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 Thread 刘乘九
多谢大佬呀~尝试了一下没有解决。这两个参数有配置上,启动的时候也显示的与配置中一致。看上面的注释说好像仅Standalone 
模式下有效,而且奇怪的是pre-job可以很顺利  session却连不上。对啦我的版本是1.11.2,大佬有空再帮忙看一眼呀











在 2021-03-23 09:28:20,"wxpcc"  写道:
>第一个问题可以尝试在flink.conf 中配上jobmanager.rpc.address 和jobmanager.rpc.port
>第二个问题不是很清楚
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 Thread wxpcc
第一个问题可以尝试在flink.conf 中配上jobmanager.rpc.address 和jobmanager.rpc.port
第二个问题不是很清楚



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 Thread 刘乘九
大佬们请教一下:
之前一直使用job模式来提交任务,可以顺利提交计算任务。最近有需求比较适合session模式来提交,按照论坛里的教程进行提交的时候,一直报错连接不上resource
  manage。观察启动log发现两种任务连接的resource manage不同,一个是正确的端口,一个一直请求本机端口。

session  模式启动log:


job  模式启动log:


想请教一下:
 1.如何配置session 模式下的  resource  manage 端口?

 2.job 模式下假如我有一个8核taskmanage服务器A配置了16个slot。job 
模式提交了一个并行度为5的任务分配到了服务器A,再通过job模式提交任务的话,就不能有任务分配到服务器A了。我有办法将剩下空闲的slot利用起来吗?或者是设计上就是对小规模的任务,减少信息传递来提升计算完成的时间?






 

flink on yarn session模式与yarn通信失败 (job模式可以成功)的问题

2021-03-22 Thread 刘乘九
大佬们请教一下:
之前一直使用job模式来提交任务,可以顺利提交计算任务。最近有需求比较适合session模式来提交,按照论坛里的教程进行提交的时候,一直报错连接不上resource
  manage。观察启动log发现两种任务连接的resource manage不同,一个是正确的端口,一个一直请求本机端口。

session  模式启动log:


job  模式启动log:


想请教一下:
 1.如何配置session 模式下的  resource  manage 端口?

 2.job 模式下假如我有一个8核taskmanage服务器A配置了16个slot。job 
模式提交了一个并行度为5的任务分配到了服务器A,再通过job模式提交任务的话,就不能有任务分配到服务器A了。我有办法将剩下空闲的slot利用起来吗?或者是设计上就是对小规模的任务,减少信息传递来提升计算完成的时间?



Flink on yarn per-job HA

2021-03-19 Thread Ink????
??
??flink1.12 flink on yarn per-job 
HAHA??

Re: Flink On Yarn Per Job 作业提交失败问题

2021-02-24 Thread Robin Zhang
Hi,凌战
看看hadoop环境变量是否正确设置,可以参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation

Best,
Robin


凌战 wrote
> hi,社区
> 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
> -rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02
> /user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
> -rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
> -rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09 
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
> -rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar
> 
> 
> 但是报错 Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
> 发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致
> 
> 
> 希望有人解惑!
> | |
> 凌战
> |
> |

> m18340872285@

> |
> 签名由网易邮箱大师定制





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink On Yarn Per Job 作业提交失败问题

2021-02-23 Thread 凌战
hi,社区
在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
-rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02 
/user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
-rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09 
/user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
-rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09  
/user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
-rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09 
/user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar


但是报错 Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致


希望有人解惑!
| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制

Re: 本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread Smile
你好,
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 这个类应该是在 flink-yarn
这个 module 里面,打 lib 包的时候作为依赖被打进 flink-dist 里面。
为什么你同时添加了 flink-dist_2.11-1.10.1.jar 和 flink-yarn_2.11-1.11.1.jar 这两个 jar
呀,不会冲突吗?

Smile



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread Smile@LETTers
你好,org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 这个类应该是在 
flink-yarn 这个 module 里面,打 lib 包的时候作为依赖被打进 flink-dist 里面。为什么你同时添加了 
flink-dist_2.11-1.10.1.jar 和 flink-yarn_2.11-1.11.1.jar 这两个 jar 
呀,不会冲突吗?Smile
在 2021-02-23 19:27:43,"凌战"  写道:
>上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
>flink-dist_2.11-1.10.1.jar
>flink-queryable-state-runtime_2.11-1.10.1.jar
>flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
>flink-table-blink_2.11-1.10.1.jar
>flink-table_2.11-1.10.1.jar
>flink-yarn_2.11-1.11.1.jar
>
>
>| |
>凌战
>|
>|
>m18340872...@163.com
>|
>签名由网易邮箱大师定制
>在2021年2月23日 19:02,凌战 写道:
>
>
>  List userClassPaths = new ArrayList<>();
>File file = ResourceUtils.getFile(new 
> URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
>if(file.isDirectory()&()!=null){
>for(File ele: Objects.requireNonNull(file.listFiles())) {
>userClassPaths.add(ele.toURI().toURL());
>}
>}
>
>
>// 构建PackagedProgram
>PackagedProgram packagedProgram =
>PackagedProgram.newBuilder()
>.setJarFile(jar)
>.setUserClassPaths(userClassPaths)
>.build();
>
>
>// 获取Configuration
>String configurationDirectory = 
> CliFrontend.getConfigurationDirectoryFromEnv();
>
>
>// 2. load the global configuration
>// 加载 flink-conf.yaml构成 Configuration
>Configuration configuration = 
> GlobalConfiguration.loadConfiguration(configurationDirectory);
>
>
>
>
>// 3. 加载jar包
>ConfigUtils.encodeCollectionToConfig(
>configuration,
>PipelineOptions.JARS,
>packagedProgram.getJobJarAndDependencies(),
>URL::toString
>);
>
>
>ConfigUtils.encodeCollectionToConfig(
>configuration,
>PipelineOptions.CLASSPATHS,
>packagedProgram.getClasspaths(),
>URL::toString
>);
>
>
>
>
>Pipeline pipeline = 
> this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
>try {
>return PackagedProgramUtils.
>getPipelineFromProgram(packagedProgram,
>configuration,
>10,
>false);
>} catch (ProgramInvocationException e) {
>e.printStackTrace();
>return null;
>}
>});
>
>
>
>
>// yarn-per-job模式
>return new PlatformAbstractJobClusterExecutor<>(new 
> YarnClusterClientFactory()).
>
> execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();
>
>
>
>
>
>
>这里添加的依赖jar包如下
>
>
>
>
>但是出现报错:
>
>
>2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 
>0 default Kryo serializers
>2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
>Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
>locate the jar
>2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
>.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
>not specified, use the configured deprecated task manager heap value 
>(1024.000mb (1073741824 bytes)) for it.
>2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
>o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
>overhead memory (102.400mb (107374184 bytes)) is less than its min value 
>192.000mb (201326592 bytes), min value will be used instead
>2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
>.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
>2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
>ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
>slotsPerTaskManager=1}
>2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
>o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] 
>does not exist.
>2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : Environment variable 
>'FLINK_LIB_DIR' not set and ship files have not been provided manually. Not 
>shipping any library files.
>2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exe

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread 凌战
上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
flink-dist_2.11-1.10.1.jar
flink-queryable-state-runtime_2.11-1.10.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink-table-blink_2.11-1.10.1.jar
flink-table_2.11-1.10.1.jar
flink-yarn_2.11-1.11.1.jar


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 19:02,凌战 写道:


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 

  1   2   3   4   5   6   >