The logs show 2 interesting pieces of information:

<tasks are submitted>
...
2019-12-19 18:33:23,278 INFO org.apache.kafka.clients.FetchSessionHandler                  - [Consumer clientId=consumer-4, groupId=ccccccdb-prod-import] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.DisconnectException.
...
2019-12-19 19:37:06,732 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address akka.tcp://flink@xxxxxx-job-0002:36835/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@xxxxxx-job-0002:36835/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..

This reads like the machine lost network connectivity for some reason. The tasks start failing because kafka cannot be reached, and the TM then shuts down because it can neither reach the ResourceManager.

On 25/12/2019 04:34, Zhijiang wrote:
If you use rocksDB state backend, it might consume extra native memory.
Some resource framework cluster like yarn would kill the container if the memory usage exceeds some threshold. You can also double check whether it exists in your case.

    ------------------------------------------------------------------
    From:John Smith <java.dev....@gmail.com>
    Send Time:2019 Dec. 25 (Wed.) 03:40
    To:Zhijiang <wangzhijiang...@aliyun.com>
    Cc:user <user@flink.apache.org>
    Subject:Re: Flink task node shut it self off.

    The shutdown happened after the massive IO wait. I don't use any
    state Checkpoints are disk based...

    On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,
    <wangzhijiang...@aliyun.com <mailto:wangzhijiang...@aliyun.com>>
    wrote:
    Hi John,

    Thanks for the positive comments of Flink usage. No matter
    at least-once or exactly-once you used for checkpoint, it would
    never lose one message during failure recovery.

    Unfortunatelly I can not visit the logs you posted. Generally
    speaking the longer internal checkpoint would mean replaying more
    source data after failure recovery.
    In my experience the 5 seconds interval for checkpoint is too
    frequently in my experience, and you might increase it to 1 minute
    or so. You can also monitor how long will the checkpoint finish in
    your application, then you can adjust the interval accordingly.

    Concerning of the node shutdown you mentioned, I am not quite sure
    whether it is relevant to your short checkpoint interval. Do you
    config to use heap state backend? The hs_err file really indicated
    that you job had encountered the memory issue, then it is better
    to somehow increase your task manager memory. But if you can
    analyze the dump hs_err file via some profiler tool for checking
    the memory usage, it might be more helpful to find the root cause.

    Best,
    Zhijiang

    ------------------------------------------------------------------
    From:John Smith <java.dev....@gmail.com
    <mailto:java.dev....@gmail.com>>
    Send Time:2019 Dec. 21 (Sat.) 05:26
    To:user <user@flink.apache.org <mailto:user@flink.apache.org>>
    Subject:Flink task node shut it self off.

    Hi, using Flink 1.8.0

    1st off I must say Flink resiliency is very impressive, we lost a
    node and never lost one message by using checkpoints and Kafka.
    Thanks!

    The cluster is a self hosted cluster and we use our own zookeeper
    cluster. We have...
    3 zookeepers: 4 cpu, 8GB (each)
    3 job nodes: 4 cpu, 8GB (each)
    3 task nodes: 4 cpu, 8GB (each)
    The nodes also share GlusterFS for storing savepoints and
    checkpoints, GlusterFS is running on the same machines.

    Yesterday a node shut itself off we the following log messages...
    - Stopping TaskExecutor
    akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
    - Stop job leader service.
    - Stopping ZooKeeperLeaderRetrievalService
    /leader/resource_manager_lock.
    - Shutting down TaskExecutorLocalStateStoresManager.
    - Shutting down BLOB cache
    - Shutting down BLOB cache
    - removed file cache directory
    /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
    - I/O manager removed spill file directory
    /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
    - Shutting down the network environment and its components.

    Prior to the node shutting off we noticed massive IOWAIT of 140%
    and CPU load 1minute of 15. And we also got an hs_err file which
    sais we should increase the memory.

    I'm attaching the logs here:
    https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

    I wonder if my 5 second checkpointing is too much for gluster.

    Any thoughts?







Reply via email to