Hi all,

I'm trying to use the State Processor API to extract all keys from a
RocksDB savepoint produced by an operator in a Flink streaming job into CSV
files.

The problem is that the storage size of the savepoint is 30TB and I'm
running into garbage collection issues no matter how much memory in
different proportions or CPU cores I allocate to task managers. (I tried
allocating up to 120GB and 16 cores to each task).

The same program and hardware configuration works with no problems for a
smaller savepoint (300GB), it's some sort of a scalability issue here.

At the beginning the tasks spend a couple hours in what I call "the
download phase". During that phase heap usage as indicated by metrics and
Flink UI is at about 10% and everything is going great.

But at certain point heap usage for tasks coming out of the download phase
starts to go up, climbs up to about 87% usage as indicated in Flink UI and
by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
usage metric doesn't increase anymore and JVM starts spending a lot of time
collecting garbage and keeping all CPUs 100% loaded. After some time in
this mode the job crashes with "java.util.concurrent.TimeoutException:
Heartbeat of TaskManager with id container_1614821414188_0002_01_000035
timed out."

At all times the indicated managed memory usage is 0%. Which seems
suspicious since RocksDB is supposed to be using it?

Also, judging by the lack of an application metric I have in the state
processor operator, KeyedStateReaderFunction.readKey never gets called.

I would appreciate if somebody helped answering some of my questions or
suggested a way I could further diagnose/fix this:

1. Is it normal that this overwhelming garbage collection starts long
before reaching 100% heap usage? At the time it happens there 's usually
10-15GB of heap showing up as available.

2. Am I correct to assume that even in batch mode Flink implements memory
back pressure and is supposed to slow down processing/allocations when it's
low on available heap memory?

3. If #2 is true, is it possible that due to some misconfiguration Flink
considers more heap space to be available than there actually is and keeps
allocating even though there's no more heap?

4. As an alternative to #3, is it possible that there are some unaccounted
heap allocations that are not shown in the UI and by the metric and
therefore not taken into account by the memory back pressure mechanism?

Here's the minimal code example that demonstrates the issue:
https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f

I'm running this on Flink 12.2 (and many earlier versions, too) with the
following base configuration and parallelism of 80 (tried lowering that to
have more resources available, too):
https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025

I tried many things with no success:
- reducing parallelism and making more resources available to each task
manager
- enabling object reuse and modifying the tuple mapper to avoid extra tuple
allocations
- manipulating memory ratios to allocate more memory to be used as heap,
managed
- allocating 20% of memory for JVM overhead
- switching to G1GC garbage collector

Again, would appreciate any help with this.

-- 
With regards,
Andrey Bulgakov

Reply via email to