Some additional information that I've gathered:

  *   The number of unique keys in the system is 10, and that is correctly 
reflected in the state.
  *   TTL for global window state is set to update on read and write, but the 
code has logic to remove old state based on event time.
  *   Not sure it's relevant, but the Flink cluster does run with jemalloc 
enabled.
  *   GitHub gist with the whole processor setup since it's not too long: 
https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678

Relevant configuration entries (explicitly set, others are left with defaults):

state.backend: rocksdb
state.backend.incremental: true
execution.checkpointing.interval: 30 s
execution.checkpointing.min-pause: 25 s
execution.checkpointing.timeout: 5 min
execution.savepoint-restore-mode: CLAIM
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION

Over the weekend, state size has grown to 1.23GB with the operators referenced 
in the processor program taking 849MB, so I'm still pretty puzzled. I thought 
it could be due to expired state being retained, but I think that doesn't make 
sense if I have finite keys, right?

Regards,
Alexis.

From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Sent: Samstag, 9. April 2022 01:39
To: ro...@apache.org
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Roman,

Here's an example of a WindowReaderFunction:

    public class StateReaderFunction extends WindowReaderFunction<Pojo, 
Integer, String, TimeWindow> {
        private static final ListStateDescriptor<Integer> LSD = new 
ListStateDescriptor<>(
                "descriptorId",
                Integer.class
        );

        @Override
        public void readWindow(String s, Context<TimeWindow> context, 
Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
            int count = 0;
            for (Integer i : context.windowState().getListState(LSD).get()) {
                count++;
            }
            out.collect(count);
        }
    }

That's for the operator that uses window state. The other readers do something 
similar but with context.globalState(). That should provide the number of state 
entries for each key+window combination, no? And after collecting all results, 
I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
context.windowState().getListState(...).clear().


Side note: in the state processor program I call 
ExecutionEnvironment#setParallelism(1) even though my streaming job runs with 
parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.

________________________________
From: Roman Khachatryan <ro...@apache.org<mailto:ro...@apache.org>>
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I've been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job's setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState<Integer> and no TTL.
> Global window state with MapState<Long, List<String>> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState<Pojo> where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and 
> I'll note here that I see 50 folders named chk-*** even though I don't set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
>         .process(...)
>
>         .collect()
>
>         .parallelStream()
>
>         .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>

Reply via email to