Re: Flink task node shut it self off.

2020-01-15 Thread John Smith
Hi, so far it seems stable.

On Mon, 6 Jan 2020 at 14:16, John Smith  wrote:

> So I increased all the jobs to 1 minute checkpoint... I let you know how
> it goes... Or of need to rethink gluster lol
>
> On Sat., Jan. 4, 2020, 9:27 p.m. John Smith, 
> wrote:
>
>> It seems to have happened again... Here is a screen shot of the system
>> metrics for that day on that particular node
>>
>> https://www.dropbox.com/s/iudn7z2fvvy7vb8/flink-node.png?dl=0
>>
>>
>> On Fri, 3 Jan 2020 at 12:19, John Smith  wrote:
>>
>>> Well there was this huge IO wait like over 140% spike. IO wait rose
>>> slowly for couple hours then at some time it spiked at 140% and then after
>>> IO wait dropped back to "normal" the CPU 1min 5min 15min spiked to like 3
>>> times the number of cores for a bit.
>>>
>>> We where at "peek" operation. I.e we where running a batch job when this
>>> hapenned. On average operation the "business" requests per second from our
>>> services is about 15 RPS when we do batches we can hit 600 RPS for a few
>>> hours and then back down. Each business request underneath does a few round
>>> trips back and forth between Kafka, cache systems Flink, DBs etc... So
>>> Flink jobs are a subset of some parts of that 600 RPS.
>>>
>>> On Flink side we 3 task managers of 4 cores 8GB which are configured as
>>> 8 slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8
>>> jobs and 9 slots free. So the cluster isn't full yet. But we do see one
>>> node is full.
>>>
>>> We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled
>>> 5 second checkpointing for 6 of the jobs... So just wondering if that was
>>> possibly the reason for the IO wait... But regardless of the RPS mentioned
>>> above the jobs will always checkpoint every 5 seconds... I had the chance
>>> to increase checkpointing for a few of the jobs before the holidays. I am
>>> back on Monday...
>>>
>>> On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
>>> wrote:
>>>
>>>> The logs show 2 interesting pieces of information:
>>>>
>>>> 
>>>> ...
>>>> 2019-12-19 18:33:23,278 INFO
>>>> org.apache.kafka.clients.FetchSessionHandler  - [Consumer
>>>> clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
>>>> request (sessionId=INVALID, epoch=INITIAL) to node 0:
>>>> org.apache.kafka.common.errors.DisconnectException.
>>>> ...
>>>> 2019-12-19 19:37:06,732 INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>>>> resolve ResourceManager address 
>>>> akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
>>>> retrying in 1 ms: Ask timed out on
>>>> [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
>>>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>>>> of type "akka.actor.Identify"..
>>>>
>>>> This reads like the machine lost network connectivity for some reason.
>>>> The tasks start failing because kafka cannot be reached, and the TM then
>>>> shuts down because it can neither reach the ResourceManager.
>>>>
>>>> On 25/12/2019 04:34, Zhijiang wrote:
>>>>
>>>> If you use rocksDB state backend, it might consume extra native memory.
>>>> Some resource framework cluster like yarn would kill the container if
>>>> the memory usage exceeds some threshold. You can also double check whether
>>>> it exists in your case.
>>>>
>>>> --
>>>> From:John Smith  
>>>> Send Time:2019 Dec. 25 (Wed.) 03:40
>>>> To:Zhijiang  
>>>> Cc:user  
>>>> Subject:Re: Flink task node shut it self off.
>>>>
>>>> The shutdown happened after the massive IO wait. I don't use any state
>>>> Checkpoints are disk based...
>>>>
>>>> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
>>>> wrote:
>>>> Hi John,
>>>>
>>>> Thanks for the positive comments of Flink usage. No matter
>>>> at least-once or exactly-once you used for checkpoint, it would never lose
>>>> one message during failure recovery.
>>>>
>>>> Unfortunatelly I can not visit the logs you 

Re: Flink task node shut it self off.

2020-01-06 Thread John Smith
So I increased all the jobs to 1 minute checkpoint... I let you know how it
goes... Or of need to rethink gluster lol

On Sat., Jan. 4, 2020, 9:27 p.m. John Smith,  wrote:

> It seems to have happened again... Here is a screen shot of the system
> metrics for that day on that particular node
>
> https://www.dropbox.com/s/iudn7z2fvvy7vb8/flink-node.png?dl=0
>
>
> On Fri, 3 Jan 2020 at 12:19, John Smith  wrote:
>
>> Well there was this huge IO wait like over 140% spike. IO wait rose
>> slowly for couple hours then at some time it spiked at 140% and then after
>> IO wait dropped back to "normal" the CPU 1min 5min 15min spiked to like 3
>> times the number of cores for a bit.
>>
>> We where at "peek" operation. I.e we where running a batch job when this
>> hapenned. On average operation the "business" requests per second from our
>> services is about 15 RPS when we do batches we can hit 600 RPS for a few
>> hours and then back down. Each business request underneath does a few round
>> trips back and forth between Kafka, cache systems Flink, DBs etc... So
>> Flink jobs are a subset of some parts of that 600 RPS.
>>
>> On Flink side we 3 task managers of 4 cores 8GB which are configured as 8
>> slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8 jobs
>> and 9 slots free. So the cluster isn't full yet. But we do see one node is
>> full.
>>
>> We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled 5
>> second checkpointing for 6 of the jobs... So just wondering if that was
>> possibly the reason for the IO wait... But regardless of the RPS mentioned
>> above the jobs will always checkpoint every 5 seconds... I had the chance
>> to increase checkpointing for a few of the jobs before the holidays. I am
>> back on Monday...
>>
>> On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
>> wrote:
>>
>>> The logs show 2 interesting pieces of information:
>>>
>>> 
>>> ...
>>> 2019-12-19 18:33:23,278 INFO
>>> org.apache.kafka.clients.FetchSessionHandler  - [Consumer
>>> clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
>>> request (sessionId=INVALID, epoch=INITIAL) to node 0:
>>> org.apache.kafka.common.errors.DisconnectException.
>>> ...
>>> 2019-12-19 19:37:06,732 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>>> resolve ResourceManager address 
>>> akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
>>> retrying in 1 ms: Ask timed out on
>>> [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
>>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>>> of type "akka.actor.Identify"..
>>>
>>> This reads like the machine lost network connectivity for some reason.
>>> The tasks start failing because kafka cannot be reached, and the TM then
>>> shuts down because it can neither reach the ResourceManager.
>>>
>>> On 25/12/2019 04:34, Zhijiang wrote:
>>>
>>> If you use rocksDB state backend, it might consume extra native memory.
>>> Some resource framework cluster like yarn would kill the container if
>>> the memory usage exceeds some threshold. You can also double check whether
>>> it exists in your case.
>>>
>>> --
>>> From:John Smith  
>>> Send Time:2019 Dec. 25 (Wed.) 03:40
>>> To:Zhijiang  
>>> Cc:user  
>>> Subject:Re: Flink task node shut it self off.
>>>
>>> The shutdown happened after the massive IO wait. I don't use any state
>>> Checkpoints are disk based...
>>>
>>> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
>>> wrote:
>>> Hi John,
>>>
>>> Thanks for the positive comments of Flink usage. No matter at least-once
>>> or exactly-once you used for checkpoint, it would never lose one message
>>> during failure recovery.
>>>
>>> Unfortunatelly I can not visit the logs you posted. Generally speaking the
>>> longer internal checkpoint would mean replaying more source data after
>>> failure recovery.
>>> In my experience the 5 seconds interval for checkpoint is too
>>> frequently in my experience, and you might increase it to 1 minute or so.
>>> You can also monitor how long will the checkpoint finish in your
>>> application, then you can adjust the interval accordingly.
>>>
>>

Re: Flink task node shut it self off.

2020-01-04 Thread John Smith
It seems to have happened again... Here is a screen shot of the system
metrics for that day on that particular node

https://www.dropbox.com/s/iudn7z2fvvy7vb8/flink-node.png?dl=0


On Fri, 3 Jan 2020 at 12:19, John Smith  wrote:

> Well there was this huge IO wait like over 140% spike. IO wait rose slowly
> for couple hours then at some time it spiked at 140% and then after IO wait
> dropped back to "normal" the CPU 1min 5min 15min spiked to like 3 times the
> number of cores for a bit.
>
> We where at "peek" operation. I.e we where running a batch job when this
> hapenned. On average operation the "business" requests per second from our
> services is about 15 RPS when we do batches we can hit 600 RPS for a few
> hours and then back down. Each business request underneath does a few round
> trips back and forth between Kafka, cache systems Flink, DBs etc... So
> Flink jobs are a subset of some parts of that 600 RPS.
>
> On Flink side we 3 task managers of 4 cores 8GB which are configured as 8
> slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8 jobs
> and 9 slots free. So the cluster isn't full yet. But we do see one node is
> full.
>
> We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled 5
> second checkpointing for 6 of the jobs... So just wondering if that was
> possibly the reason for the IO wait... But regardless of the RPS mentioned
> above the jobs will always checkpoint every 5 seconds... I had the chance
> to increase checkpointing for a few of the jobs before the holidays. I am
> back on Monday...
>
> On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
> wrote:
>
>> The logs show 2 interesting pieces of information:
>>
>> 
>> ...
>> 2019-12-19 18:33:23,278 INFO
>> org.apache.kafka.clients.FetchSessionHandler  - [Consumer
>> clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
>> request (sessionId=INVALID, epoch=INITIAL) to node 0:
>> org.apache.kafka.common.errors.DisconnectException.
>> ...
>> 2019-12-19 19:37:06,732 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address 
>> akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
>> retrying in 1 ms: Ask timed out on
>> [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>> of type "akka.actor.Identify"..
>>
>> This reads like the machine lost network connectivity for some reason.
>> The tasks start failing because kafka cannot be reached, and the TM then
>> shuts down because it can neither reach the ResourceManager.
>>
>> On 25/12/2019 04:34, Zhijiang wrote:
>>
>> If you use rocksDB state backend, it might consume extra native memory.
>> Some resource framework cluster like yarn would kill the container if the
>> memory usage exceeds some threshold. You can also double check whether it
>> exists in your case.
>>
>> --
>> From:John Smith  
>> Send Time:2019 Dec. 25 (Wed.) 03:40
>> To:Zhijiang  
>> Cc:user  
>> Subject:Re: Flink task node shut it self off.
>>
>> The shutdown happened after the massive IO wait. I don't use any state
>> Checkpoints are disk based...
>>
>> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
>> wrote:
>> Hi John,
>>
>> Thanks for the positive comments of Flink usage. No matter at least-once
>> or exactly-once you used for checkpoint, it would never lose one message
>> during failure recovery.
>>
>> Unfortunatelly I can not visit the logs you posted. Generally speaking the
>> longer internal checkpoint would mean replaying more source data after
>> failure recovery.
>> In my experience the 5 seconds interval for checkpoint is too frequently
>> in my experience, and you might increase it to 1 minute or so. You can also
>> monitor how long will the checkpoint finish in your application, then you
>> can adjust the interval accordingly.
>>
>> Concerning of the node shutdown you mentioned, I am not quite sure
>> whether it is relevant to your short checkpoint interval. Do you config to
>> use heap state backend?  The hs_err file really indicated that you job
>> had encountered the memory issue, then it is better to somehow increase
>> your task manager memory. But if you can analyze the dump hs_err file via
>> some profiler tool for checking the memory usage, it might be more helpful
>> to find the root cause.
>>
>> Best,
>

Re: Flink task node shut it self off.

2020-01-03 Thread John Smith
Well there was this huge IO wait like over 140% spike. IO wait rose slowly
for couple hours then at some time it spiked at 140% and then after IO wait
dropped back to "normal" the CPU 1min 5min 15min spiked to like 3 times the
number of cores for a bit.

We where at "peek" operation. I.e we where running a batch job when this
hapenned. On average operation the "business" requests per second from our
services is about 15 RPS when we do batches we can hit 600 RPS for a few
hours and then back down. Each business request underneath does a few round
trips back and forth between Kafka, cache systems Flink, DBs etc... So
Flink jobs are a subset of some parts of that 600 RPS.

On Flink side we 3 task managers of 4 cores 8GB which are configured as 8
slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8 jobs
and 9 slots free. So the cluster isn't full yet. But we do see one node is
full.

We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled 5
second checkpointing for 6 of the jobs... So just wondering if that was
possibly the reason for the IO wait... But regardless of the RPS mentioned
above the jobs will always checkpoint every 5 seconds... I had the chance
to increase checkpointing for a few of the jobs before the holidays. I am
back on Monday...

On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
wrote:

> The logs show 2 interesting pieces of information:
>
> 
> ...
> 2019-12-19 18:33:23,278 INFO
> org.apache.kafka.clients.FetchSessionHandler  - [Consumer
> clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
> request (sessionId=INVALID, epoch=INITIAL) to node 0:
> org.apache.kafka.common.errors.DisconnectException.
> ...
> 2019-12-19 19:37:06,732 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address 
> akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
> retrying in 1 ms: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
> of type "akka.actor.Identify"..
>
> This reads like the machine lost network connectivity for some reason. The
> tasks start failing because kafka cannot be reached, and the TM then shuts
> down because it can neither reach the ResourceManager.
>
> On 25/12/2019 04:34, Zhijiang wrote:
>
> If you use rocksDB state backend, it might consume extra native memory.
> Some resource framework cluster like yarn would kill the container if the
> memory usage exceeds some threshold. You can also double check whether it
> exists in your case.
>
> ----------
> From:John Smith  
> Send Time:2019 Dec. 25 (Wed.) 03:40
> To:Zhijiang  
> Cc:user  
> Subject:Re: Flink task node shut it self off.
>
> The shutdown happened after the massive IO wait. I don't use any state
> Checkpoints are disk based...
>
> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
> wrote:
> Hi John,
>
> Thanks for the positive comments of Flink usage. No matter at least-once
> or exactly-once you used for checkpoint, it would never lose one message
> during failure recovery.
>
> Unfortunatelly I can not visit the logs you posted. Generally speaking the
> longer internal checkpoint would mean replaying more source data after
> failure recovery.
> In my experience the 5 seconds interval for checkpoint is too frequently
> in my experience, and you might increase it to 1 minute or so. You can also
> monitor how long will the checkpoint finish in your application, then you
> can adjust the interval accordingly.
>
> Concerning of the node shutdown you mentioned, I am not quite sure whether
> it is relevant to your short checkpoint interval. Do you config to use heap
> state backend?  The hs_err file really indicated that you job had
> encountered the memory issue, then it is better to somehow increase your
> task manager memory. But if you can analyze the dump hs_err file via some
> profiler tool for checking the memory usage, it might be more helpful to
> find the root cause.
>
> Best,
> Zhijiang
>
> --
> From:John Smith 
> Send Time:2019 Dec. 21 (Sat.) 05:26
> To:user 
> Subject:Flink task node shut it self off.
>
> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nod

Re: Flink task node shut it self off.

2020-01-03 Thread Chesnay Schepler

The logs show 2 interesting pieces of information:


...
2019-12-19 18:33:23,278 INFO 
org.apache.kafka.clients.FetchSessionHandler  - 
[Consumer clientId=consumer-4, groupId=ccdb-prod-import] Error 
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: 
org.apache.kafka.common.errors.DisconnectException.

...
2019-12-19 19:37:06,732 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.tcp://flink@xx-job-0002:36835/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..


This reads like the machine lost network connectivity for some reason. 
The tasks start failing because kafka cannot be reached, and the TM then 
shuts down because it can neither reach the ResourceManager.


On 25/12/2019 04:34, Zhijiang wrote:

If you use rocksDB state backend, it might consume extra native memory.
Some resource framework cluster like yarn would kill the container if 
the memory usage exceeds some threshold. You can also double check 
whether it exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any
state Checkpoints are disk based...

On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,
mailto:wangzhijiang...@aliyun.com>>
wrote:
Hi John,

Thanks for the positive comments of Flink usage. No matter
at least-once or exactly-once you used for checkpoint, it would
never lose one message during failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally
speaking the longer internal checkpoint would mean replaying more
source data after failure recovery.
In my experience the 5 seconds interval for checkpoint is too
frequently in my experience, and you might increase it to 1 minute
or so. You can also monitor how long will the checkpoint finish in
your application, then you can adjust the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure
whether it is relevant to your short checkpoint interval. Do you
config to use heap state backend? The hs_err file really indicated
that you job had encountered the memory issue, then it is better
to somehow increase your task manager memory. But if you can
analyze the dump hs_err file via some profiler tool for checking
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang

--
From:John Smith mailto:java.dev@gmail.com>>
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user mailto:user@flink.apache.org>>
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a
node and never lost one message by using checkpoints and Kafka.
Thanks!

The cluster is a self hosted cluster and we use our own zookeeper
cluster. We have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and
checkpoints, GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService
/leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140%
and CPU load 1minute of 15. And we also got an hs_err file which
sais we should increase the memory.

I'm attaching the logs here:
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?










Re: Flink task node shut it self off.

2019-12-24 Thread Zhijiang
If you use rocksDB state backend, it might consume extra native memory. 
Some resource framework cluster like yarn would kill the container if the 
memory usage exceeds some threshold. You can also double check whether it 
exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any state 
Checkpoints are disk based...
On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,  wrote:

Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 

--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?








Re: Flink task node shut it self off.

2019-12-24 Thread John Smith
The shutdown happened after the massive IO wait. I don't use any state
Checkpoints are disk based...

On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
wrote:

> Hi John,
>
> Thanks for the positive comments of Flink usage. No matter at least-once
> or exactly-once you used for checkpoint, it would never lose one message
> during failure recovery.
>
> Unfortunatelly I can not visit the logs you posted. Generally speaking the
> longer internal checkpoint would mean replaying more source data after
> failure recovery.
> In my experience the 5 seconds interval for checkpoint is too frequently
> in my experience, and you might increase it to 1 minute or so. You can also
> monitor how long will the checkpoint finish in your application, then you
> can adjust the interval accordingly.
>
> Concerning of the node shutdown you mentioned, I am not quite sure whether
> it is relevant to your short checkpoint interval. Do you config to use heap
> state backend?  The hs_err file really indicated that you job had
> encountered the memory issue, then it is better to somehow increase your
> task manager memory. But if you can analyze the dump hs_err file via some
> profiler tool for checking the memory usage, it might be more helpful to
> find the root cause.
>
> Best,
> Zhijiang
>
> --
> From:John Smith 
> Send Time:2019 Dec. 21 (Sat.) 05:26
> To:user 
> Subject:Flink task node shut it self off.
>
> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nodes also share GlusterFS for storing savepoints and checkpoints,
> GlusterFS is running on the same machines.
>
> Yesterday a node shut itself off we the following log messages...
> - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73
> :34697/user/taskmanager_0.
> - Stop job leader service.
> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> - Shutting down TaskExecutorLocalStateStoresManager.
> - Shutting down BLOB cache
> - Shutting down BLOB cache
> - removed file cache directory
> /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
> - I/O manager removed spill file directory
> /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
> - Shutting down the network environment and its components.
>
> Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU
> load 1minute of 15. And we also got an hs_err file which sais we should
> increase the memory.
>
> I'm attaching the logs here:
> https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0
>
> I wonder if my 5 second checkpointing is too much for gluster.
>
> Any thoughts?
>
>
>
>
>
>


Re: Flink task node shut it self off.

2019-12-22 Thread Zhijiang
Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 


--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?







Re: Flink task node shut it self off.

2019-12-20 Thread jingjing bai
hi john

in our experience , the checkpoint interval we set interval 1-10 minute and
timeout usurally   5*interval . mostly we set 2 or 5 minute and 10 or
20timeout.
it depend on u data bulk per second and which window used.

John Smith  于2019年12月21日周六 上午5:26写道:

> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nodes also share GlusterFS for storing savepoints and checkpoints,
> GlusterFS is running on the same machines.
>
> Yesterday a node shut itself off we the following log messages...
> - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73
> :34697/user/taskmanager_0.
> - Stop job leader service.
> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> - Shutting down TaskExecutorLocalStateStoresManager.
> - Shutting down BLOB cache
> - Shutting down BLOB cache
> - removed file cache directory
> /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
> - I/O manager removed spill file directory
> /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
> - Shutting down the network environment and its components.
>
> Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU
> load 1minute of 15. And we also got an hs_err file which sais we should
> increase the memory.
>
> I'm attaching the logs here:
> https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0
>
> I wonder if my 5 second checkpointing is too much for gluster.
>
> Any thoughts?
>
>
>
>
>


Flink task node shut it self off.

2019-12-20 Thread John Smith
Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster.
We have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints,
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73
:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU
load 1minute of 15. And we also got an hs_err file which sais we should
increase the memory.

I'm attaching the logs here:
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?