Hi Yun,

Thanks for your thoughts.  Answers to your questions:

>  1. "after around 50GB of state, I stop being able to reliably take
>     checkpoints or savepoints. "
>     What is the exact reason that job cannot complete checkpoint?
>     Expired before completing or decline by some tasks? The former one
>     is manly caused by high back-pressure and the later one is mainly
>     due to some internal error.

In the UI, under Job | Checkpoints | History, then opening the checkpoint detail, the checkpoints fail by not some operators not acknowledging. It's always a subset of of the larger state operators that stop acknowledging. The exact selection of operators that stop is nondeterministic. The checkpoints frequently fail before any timeout that I impose on them.

>  2. Have you checked what reason the remote task manager is lost?
>     If the remote task manager is not crashed, it might be due to GC
> impact, I think you might need to check task-manager logs and GC logs.

The only general pattern I have observed is:

    1) Some taskmanager A throws one of the various connectivity
    exceptions I listed complaining about another taskmanager B.
    2) Taskmanager B shows no obvious error other than complaining
    that taskmanager A has disconnected from it.

Regards,


Jeff Henrikson



On 6/17/20 9:52 PM, Yun Tang wrote:
Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably take
    checkpoints or savepoints. "
    What is the exact reason that job cannot complete checkpoint?
    Expired before completing or decline by some tasks? The former one
    is manly caused by high back-pressure and the later one is mainly
    due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
    If the remote task manager is not crashed, it might be due to GC
    impact, I think you might need to check task-manager logs and GC logs.

Best
Yun Tang
------------------------------------------------------------------------
*From:* Jeff Henrikson <jehenri...@gmail.com>
*Sent:* Thursday, June 18, 2020 1:46
*To:* user <user@flink.apache.org>
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events are
read from kafka and have event timestamps.  The joins are built using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the same ID
has been processed.  Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention has been
attained.  The application runs RocksDBStateBackend, on Kubernetes on
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node
cluster, watermark output progress seems to indicate I should be able to
bootstrap my state of around 500GB in around 1 day.  I am able to save
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able
to reliably take checkpoints or savepoints.  Some time after that, I
start getting a variety of failures where the first suspicious log event
is a generic cluster connectivity error, such as:

      1) java.io.IOException: Connecting the channel failed: Connecting
      to remote task manager + '/10.67.7.101:38955' has failed. This
      might indicate that the remote task manager has been lost.

      2) org.apache.flink.runtime.io.network.netty.exception
      .RemoteTransportException: Connection unexpectedly closed by remote
      task manager 'null'. This might indicate that the remote task
      manager was lost.

      3) Association with remote system
      [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
      gated for [50] ms. Reason: [Association failed with
      [akka.tcp://flink@10.67.6.66:34987]] Caused by:
      [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum
savable state size.

I could enable HA, but for the time being I have been leaving it out to
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
      parallelism=8
      maxParallelism=64
      setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
      setTolerableCheckpointFailureNumber(1000)
      setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
      RocksDBStateBackend
      setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
      setNumberOfTransferThreads(25)
      setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

      jobmanager.rpc.address: localhost
      jobmanager.rpc.port: 6123
      jobmanager.heap.size: 28000m
      taskmanager.memory.process.size: 28000m
      taskmanager.memory.jvm-metaspace.size: 512m
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 1
      jobmanager.execution.failover-strategy: full

      cluster.evenly-spread-out-slots: false

      taskmanager.memory.network.fraction: 0.2           # default 0.1
      taskmanager.memory.framework.off-heap.size: 2GB
      taskmanager.memory.task.off-heap.size: 2GB
      taskmanager.network.memory.buffers-per-channel: 32 # default 2
      taskmanager.memory.managed.fraction: 0.4           # docs say
default 0.1, but something seems to set 0.4
      taskmanager.memory.task.off-heap.size: 2048MB      # default 128M

      state.backend.fs.memory-threshold: 1048576
      state.backend.fs.write-buffer-size: 10240000
      state.backend.local-recovery: true
      state.backend.rocksdb.writebuffer.size: 64MB
      state.backend.rocksdb.writebuffer.count: 8
      state.backend.rocksdb.writebuffer.number-to-merge: 4
      state.backend.rocksdb.timer-service.factory: heap
      state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
      state.backend.rocksdb.write-batch-size: 16000000 # default 2MB

      web.checkpoints.history: 250

Reply via email to