Extracting state keys for a very large RocksDB savepoint

2021-03-09 Thread Andrey Bulgakov
Hi all,

I'm trying to use the State Processor API to extract all keys from a
RocksDB savepoint produced by an operator in a Flink streaming job into CSV
files.

The problem is that the storage size of the savepoint is 30TB and I'm
running into garbage collection issues no matter how much memory in
different proportions or CPU cores I allocate to task managers. (I tried
allocating up to 120GB and 16 cores to each task).

The same program and hardware configuration works with no problems for a
smaller savepoint (300GB), it's some sort of a scalability issue here.

At the beginning the tasks spend a couple hours in what I call "the
download phase". During that phase heap usage as indicated by metrics and
Flink UI is at about 10% and everything is going great.

But at certain point heap usage for tasks coming out of the download phase
starts to go up, climbs up to about 87% usage as indicated in Flink UI and
by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
usage metric doesn't increase anymore and JVM starts spending a lot of time
collecting garbage and keeping all CPUs 100% loaded. After some time in
this mode the job crashes with "java.util.concurrent.TimeoutException:
Heartbeat of TaskManager with id container_1614821414188_0002_01_35
timed out."

At all times the indicated managed memory usage is 0%. Which seems
suspicious since RocksDB is supposed to be using it?

Also, judging by the lack of an application metric I have in the state
processor operator, KeyedStateReaderFunction.readKey never gets called.

I would appreciate if somebody helped answering some of my questions or
suggested a way I could further diagnose/fix this:

1. Is it normal that this overwhelming garbage collection starts long
before reaching 100% heap usage? At the time it happens there 's usually
10-15GB of heap showing up as available.

2. Am I correct to assume that even in batch mode Flink implements memory
back pressure and is supposed to slow down processing/allocations when it's
low on available heap memory?

3. If #2 is true, is it possible that due to some misconfiguration Flink
considers more heap space to be available than there actually is and keeps
allocating even though there's no more heap?

4. As an alternative to #3, is it possible that there are some unaccounted
heap allocations that are not shown in the UI and by the metric and
therefore not taken into account by the memory back pressure mechanism?

Here's the minimal code example that demonstrates the issue:
https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f

I'm running this on Flink 12.2 (and many earlier versions, too) with the
following base configuration and parallelism of 80 (tried lowering that to
have more resources available, too):
https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025

I tried many things with no success:
- reducing parallelism and making more resources available to each task
manager
- enabling object reuse and modifying the tuple mapper to avoid extra tuple
allocations
- manipulating memory ratios to allocate more memory to be used as heap,
managed
- allocating 20% of memory for JVM overhead
- switching to G1GC garbage collector

Again, would appreciate any help with this.

-- 
With regards,
Andrey Bulgakov


Re: Extracting state keys for a very large RocksDB savepoint

2021-03-14 Thread Andrey Bulgakov
If anyone is interested, I reliazed that State Processor API was not the
right tool for this since it spends a lot of time rebuilding RocksDB tables
and then a lot of memory trying to read from it. All I really needed was
operator keys.

So I used SavepointLoader.loadSavepointMetadata to get KeyGroupsStateHandle
objects and built an InputFormat heavily based on the code I found
in RocksDBFullRestoreOperation.java.

It ended up working extremely quickly while keeping memory and CPU usage at
the minimum.

On Tue, Mar 9, 2021 at 1:51 PM Andrey Bulgakov  wrote:

> Hi all,
>
> I'm trying to use the State Processor API to extract all keys from a
> RocksDB savepoint produced by an operator in a Flink streaming job into CSV
> files.
>
> The problem is that the storage size of the savepoint is 30TB and I'm
> running into garbage collection issues no matter how much memory in
> different proportions or CPU cores I allocate to task managers. (I tried
> allocating up to 120GB and 16 cores to each task).
>
> The same program and hardware configuration works with no problems for a
> smaller savepoint (300GB), it's some sort of a scalability issue here.
>
> At the beginning the tasks spend a couple hours in what I call "the
> download phase". During that phase heap usage as indicated by metrics and
> Flink UI is at about 10% and everything is going great.
>
> But at certain point heap usage for tasks coming out of the download phase
> starts to go up, climbs up to about 87% usage as indicated in Flink UI and
> by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
> usage metric doesn't increase anymore and JVM starts spending a lot of time
> collecting garbage and keeping all CPUs 100% loaded. After some time in
> this mode the job crashes with "java.util.concurrent.TimeoutException:
> Heartbeat of TaskManager with id container_1614821414188_0002_01_35
> timed out."
>
> At all times the indicated managed memory usage is 0%. Which seems
> suspicious since RocksDB is supposed to be using it?
>
> Also, judging by the lack of an application metric I have in the state
> processor operator, KeyedStateReaderFunction.readKey never gets called.
>
> I would appreciate if somebody helped answering some of my questions or
> suggested a way I could further diagnose/fix this:
>
> 1. Is it normal that this overwhelming garbage collection starts long
> before reaching 100% heap usage? At the time it happens there 's usually
> 10-15GB of heap showing up as available.
>
> 2. Am I correct to assume that even in batch mode Flink implements memory
> back pressure and is supposed to slow down processing/allocations when it's
> low on available heap memory?
>
> 3. If #2 is true, is it possible that due to some misconfiguration Flink
> considers more heap space to be available than there actually is and keeps
> allocating even though there's no more heap?
>
> 4. As an alternative to #3, is it possible that there are some unaccounted
> heap allocations that are not shown in the UI and by the metric and
> therefore not taken into account by the memory back pressure mechanism?
>
> Here's the minimal code example that demonstrates the issue:
> https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f
>
> I'm running this on Flink 12.2 (and many earlier versions, too) with the
> following base configuration and parallelism of 80 (tried lowering that to
> have more resources available, too):
> https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025
>
> I tried many things with no success:
> - reducing parallelism and making more resources available to each task
> manager
> - enabling object reuse and modifying the tuple mapper to avoid extra
> tuple allocations
> - manipulating memory ratios to allocate more memory to be used as heap,
> managed
> - allocating 20% of memory for JVM overhead
> - switching to G1GC garbage collector
>
> Again, would appreciate any help with this.
>
> --
> With regards,
> Andrey Bulgakov
>


-- 
With regards,
Andrey Bulgakov


Re: Extracting state keys for a very large RocksDB savepoint

2021-03-17 Thread Andrey Bulgakov
Hi Gordon,

I think my current implementation is very specific and wouldn't be that
valuable for the broader public.
But I think there's a potential version of it that could also retrieve
values from a savepoint in the same efficient way and that would be
something that other people might need.

I'm currently thinking about something similar to KeyedProcessFunction but
taking a single state descriptor as a parameter instead of expecting a user
to "register" some of them open(). The processElement() method would then
be invoked with both key and value.

One thing I'm not sure about are MapStateDescriptors because it stores
compound keys separately and I'm not sure if they are stored in a sorted
order and can be passed to processElement() as a group or should rather be
passed separately.

I'll experiment with this for a while and try to figure out what works.
Please let me know if you have thoughts about this.

On Sun, Mar 14, 2021 at 11:55 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Andrey,
>
> Perhaps the functionality you described is worth adding to the State
> Processor API.
> Your observation on how the library currently works is correct; basically
> it
> tries to restore the state backends as is.
>
> In you current implementation, do you see it worthwhile to try to add this?
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
With regards,
Andrey Bulgakov


Re: Extracting state keys for a very large RocksDB savepoint

2021-03-17 Thread Andrey Bulgakov
I guess there's no point in making it a KeyedProcessFunction since it's not
going to have access to context, timers or anything like that. So it can be
a simple InputFormat returning a DataSet of key and value tuples.

On Wed, Mar 17, 2021 at 8:37 AM Andrey Bulgakov  wrote:

> Hi Gordon,
>
> I think my current implementation is very specific and wouldn't be that
> valuable for the broader public.
> But I think there's a potential version of it that could also retrieve
> values from a savepoint in the same efficient way and that would be
> something that other people might need.
>
> I'm currently thinking about something similar to KeyedProcessFunction but
> taking a single state descriptor as a parameter instead of expecting a user
> to "register" some of them open(). The processElement() method would then
> be invoked with both key and value.
>
> One thing I'm not sure about are MapStateDescriptors because it stores
> compound keys separately and I'm not sure if they are stored in a sorted
> order and can be passed to processElement() as a group or should rather be
> passed separately.
>
> I'll experiment with this for a while and try to figure out what works.
> Please let me know if you have thoughts about this.
>
> On Sun, Mar 14, 2021 at 11:55 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Andrey,
>>
>> Perhaps the functionality you described is worth adding to the State
>> Processor API.
>> Your observation on how the library currently works is correct; basically
>> it
>> tries to restore the state backends as is.
>>
>> In you current implementation, do you see it worthwhile to try to add
>> this?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> With regards,
> Andrey Bulgakov
>


-- 
With regards,
Andrey Bulgakov


No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-18 Thread Andrey Bulgakov
Hey all,

I'm working on migrating our Flink job away from Hadoop session mode to K8S
application mode.
It's been going great so far but I'm hitting a wall with this seemingly
simple thing.

In the first phase of the migration I want to remove some operators (their
state can be discarded) and focus on getting the primary pipeline running
first.
For that I have to start the cluster from a savepoint with the
"allowNonRestoredState" parameter turned on.

The problem is that I can't set it in any way that I'm aware of. I tried 4
ways separately and simultaneously:

1) Adding --allowNonRestoredState to flink run-application
-t kubernetes-application
2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
run-application -t kubernetes-application
3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
flink-conf.yaml where I'm running flink run-application
4) Overriding it in the application code:
val sigh = new Configuration()
sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
true)
env.configure(sigh)

Every time the resulting pod ends up with "false" value for this setting in
its configmap:
$ kc describe cm/flink-config-flink-test | grep ignore
execution.savepoint.ignore-unclaimed-state: false

And I get the exception:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint
. Cannot map checkpoint/savepoint state for operator
68895e9129981bfc6d96d1dad715298e to the new program, because the operator
is not available in the new program. If you want to allow to skip this, you
can set the --allowNonRestoredState option on the CLI.

It seems like something overrides it to false and it never has any effect.

Can this be a bug or am I doing something wrong?

For context, the savepoint is produced by Flink 1.8.2 and the version I'm
trying to run on K8S is 1.14.3.

-- 
With regards,
Andrey Bulgakov


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-18 Thread Andrey Bulgakov
Hi Austin,

Thanks for the reply! Yeah, the docs aren't super explicit about this.

But for what it's worth, I'm setting a few options unrelated to kubernetes
this way and they all have effect:
-Dstate.checkpoints.num-retained=100 \

-Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
\
-Dio.tmp.dirs=/data/flink-local-data \
-Dqueryable-state.enable=true \

The only one i'm having problems with is
"execution.savepoint.ignore-unclaimed-state".

On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Andrey,
>
> It's unclear to me from the docs[1] if the flink native-kubernetes
> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>
> Best,
> Austin
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>
> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov  wrote:
>
>> Hey all,
>>
>> I'm working on migrating our Flink job away from Hadoop session mode to
>> K8S application mode.
>> It's been going great so far but I'm hitting a wall with this seemingly
>> simple thing.
>>
>> In the first phase of the migration I want to remove some operators
>> (their state can be discarded) and focus on getting the primary pipeline
>> running first.
>> For that I have to start the cluster from a savepoint with the
>> "allowNonRestoredState" parameter turned on.
>>
>> The problem is that I can't set it in any way that I'm aware of. I tried
>> 4 ways separately and simultaneously:
>>
>> 1) Adding --allowNonRestoredState to flink run-application
>> -t kubernetes-application
>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>> run-application -t kubernetes-application
>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
>> flink-conf.yaml where I'm running flink run-application
>> 4) Overriding it in the application code:
>> val sigh = new Configuration()
>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>> true)
>> env.configure(sigh)
>>
>> Every time the resulting pod ends up with "false" value for this setting
>> in its configmap:
>> $ kc describe cm/flink-config-flink-test | grep ignore
>> execution.savepoint.ignore-unclaimed-state: false
>>
>> And I get the exception:
>> java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>> operator is not available in the new program. If you want to allow to skip
>> this, you can set the --allowNonRestoredState option on the CLI.
>>
>> It seems like something overrides it to false and it never has any effect.
>>
>> Can this be a bug or am I doing something wrong?
>>
>> For context, the savepoint is produced by Flink 1.8.2 and the version I'm
>> trying to run on K8S is 1.14.3.
>>
>> --
>> With regards,
>> Andrey Bulgakov
>>
>>

-- 
With regards,
Andrey Bulgakov


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-22 Thread Andrey Bulgakov
Thank you, Yang. That was it! Specifying "--fromSavepoint" and
"--allowNonRestoredState" for "run-application" together did the trick.

I was a bit confused, because when you run "flink run-application --help",
it only tells you about the "--executor" and "--target" options. So I
assumed I should pass everything else as -D params. I had only tried
passing "--allowNonRestoredState" on the CLI as the last resort but didn't
think to do it together with "--fromSavepoint".

Thanks again!

On Sun, Feb 20, 2022 at 9:49 PM Yang Wang  wrote:

> By design, we should support arbitrary config keys via the CLI when using
> generic CLI mode.
>
> Do you have also specified the "--fromSavepoint" along with
> "--allowNonRestoredState" when submitting a Flink job via "flink
> run-application"?
>
> From the current code base, it seems that the CLI options(e.g
> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink
> config options.
> And it will make the savepoint related config options are overwritten
> wrongly. Refer to the implementation[1].
>
> [1].
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181
>
>
> Best,
> Yang
>
> Andrey Bulgakov  于2022年2月19日周六 08:30写道:
>
>> Hi Austin,
>>
>> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>>
>> But for what it's worth, I'm setting a few options unrelated to
>> kubernetes this way and they all have effect:
>> -Dstate.checkpoints.num-retained=100 \
>>
>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>> \
>> -Dio.tmp.dirs=/data/flink-local-data \
>> -Dqueryable-state.enable=true \
>>
>> The only one i'm having problems with is
>> "execution.savepoint.ignore-unclaimed-state".
>>
>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Andrey,
>>>
>>> It's unclear to me from the docs[1] if the flink native-kubernetes
>>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>>
>>> Best,
>>> Austin
>>>
>>> [1]:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>>
>>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov 
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I'm working on migrating our Flink job away from Hadoop session mode to
>>>> K8S application mode.
>>>> It's been going great so far but I'm hitting a wall with this seemingly
>>>> simple thing.
>>>>
>>>> In the first phase of the migration I want to remove some operators
>>>> (their state can be discarded) and focus on getting the primary pipeline
>>>> running first.
>>>> For that I have to start the cluster from a savepoint with the
>>>> "allowNonRestoredState" parameter turned on.
>>>>
>>>> The problem is that I can't set it in any way that I'm aware of. I
>>>> tried 4 ways separately and simultaneously:
>>>>
>>>> 1) Adding --allowNonRestoredState to flink run-application
>>>> -t kubernetes-application
>>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>>>> run-application -t kubernetes-application
>>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my
>>>> local flink-conf.yaml where I'm running flink run-application
>>>> 4) Overriding it in the application code:
>>>> val sigh = new Configuration()
>>>> 
>>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>>>> true)
>>>> env.configure(sigh)
>>>>
>>>> Every time the resulting pod ends up with "false" value for this
>>>> setting in its configmap:
>>>> $ kc describe cm/flink-config-flink-test | grep ignore
>>>> execution.savepoint.ignore-unclaimed-state: false
>>>>
>>>> And I get the exception:
>>>> java.lang.IllegalStateException: Failed to rollback to
>>>> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
>>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>>>> operator is not available in the new program. If you want to allow to skip
>>>> this, you can set the --allowNonRestoredState option on the CLI.
>>>>
>>>> It seems like something overrides it to false and it never has any
>>>> effect.
>>>>
>>>> Can this be a bug or am I doing something wrong?
>>>>
>>>> For context, the savepoint is produced by Flink 1.8.2 and the version
>>>> I'm trying to run on K8S is 1.14.3.
>>>>
>>>> --
>>>> With regards,
>>>> Andrey Bulgakov
>>>>
>>>>
>>
>> --
>> With regards,
>> Andrey Bulgakov
>>
>

-- 
With regards,
Andrey Bulgakov