If anyone is interested, I reliazed that State Processor API was not the
right tool for this since it spends a lot of time rebuilding RocksDB tables
and then a lot of memory trying to read from it. All I really needed was
operator keys.

So I used SavepointLoader.loadSavepointMetadata to get KeyGroupsStateHandle
objects and built an InputFormat heavily based on the code I found
in RocksDBFullRestoreOperation.java.

It ended up working extremely quickly while keeping memory and CPU usage at
the minimum.

On Tue, Mar 9, 2021 at 1:51 PM Andrey Bulgakov <m...@andreiko.ru> wrote:

> Hi all,
>
> I'm trying to use the State Processor API to extract all keys from a
> RocksDB savepoint produced by an operator in a Flink streaming job into CSV
> files.
>
> The problem is that the storage size of the savepoint is 30TB and I'm
> running into garbage collection issues no matter how much memory in
> different proportions or CPU cores I allocate to task managers. (I tried
> allocating up to 120GB and 16 cores to each task).
>
> The same program and hardware configuration works with no problems for a
> smaller savepoint (300GB), it's some sort of a scalability issue here.
>
> At the beginning the tasks spend a couple hours in what I call "the
> download phase". During that phase heap usage as indicated by metrics and
> Flink UI is at about 10% and everything is going great.
>
> But at certain point heap usage for tasks coming out of the download phase
> starts to go up, climbs up to about 87% usage as indicated in Flink UI and
> by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
> usage metric doesn't increase anymore and JVM starts spending a lot of time
> collecting garbage and keeping all CPUs 100% loaded. After some time in
> this mode the job crashes with "java.util.concurrent.TimeoutException:
> Heartbeat of TaskManager with id container_1614821414188_0002_01_000035
> timed out."
>
> At all times the indicated managed memory usage is 0%. Which seems
> suspicious since RocksDB is supposed to be using it?
>
> Also, judging by the lack of an application metric I have in the state
> processor operator, KeyedStateReaderFunction.readKey never gets called.
>
> I would appreciate if somebody helped answering some of my questions or
> suggested a way I could further diagnose/fix this:
>
> 1. Is it normal that this overwhelming garbage collection starts long
> before reaching 100% heap usage? At the time it happens there 's usually
> 10-15GB of heap showing up as available.
>
> 2. Am I correct to assume that even in batch mode Flink implements memory
> back pressure and is supposed to slow down processing/allocations when it's
> low on available heap memory?
>
> 3. If #2 is true, is it possible that due to some misconfiguration Flink
> considers more heap space to be available than there actually is and keeps
> allocating even though there's no more heap?
>
> 4. As an alternative to #3, is it possible that there are some unaccounted
> heap allocations that are not shown in the UI and by the metric and
> therefore not taken into account by the memory back pressure mechanism?
>
> Here's the minimal code example that demonstrates the issue:
> https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f
>
> I'm running this on Flink 12.2 (and many earlier versions, too) with the
> following base configuration and parallelism of 80 (tried lowering that to
> have more resources available, too):
> https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025
>
> I tried many things with no success:
> - reducing parallelism and making more resources available to each task
> manager
> - enabling object reuse and modifying the tuple mapper to avoid extra
> tuple allocations
> - manipulating memory ratios to allocate more memory to be used as heap,
> managed
> - allocating 20% of memory for JVM overhead
> - switching to G1GC garbage collector
>
> Again, would appreciate any help with this.
>
> --
> With regards,
> Andrey Bulgakov
>


-- 
With regards,
Andrey Bulgakov

Reply via email to