Thanks Roman, that's exactly what I needed.

śr., 24 lut 2021 o 14:37 Roman Khachatryan <ro...@apache.org> napisał(a):
>
> Thanks for the clarification.
>
> RocksDB stores whatever value Flink passes to it after serialization.
> The value is passed as an array of bytes so the minimum is single byte.
> Integer would require 4 bytes, Object - 1 or 2 depending on the serializer 
> (Pojo or Kryo), and boolean just 1 byte.
> Besides that, boolean serialization is apparently faster.
>
> Sizes in memory, on disk and of snapshot are all affected proportionally.
>
> You are right regarding Flink compression settings will not have any impact 
> with incremental checkpoints.
>
> Regards,
> Roman
>
>
> On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski 
> <obuchowski.mac...@gmail.com> wrote:
>>
>> Hey. Let me send simplified example, because I don't think this
>> "(given that the actual stored objects (integers) are the same)" is
>> true - I'm just storing object as a placeholder:
>>
>> public class DeduplicationProcessFunction<K, IN> extends
>> KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction {
>>
>>     private transient ValueState<Object> processedState;
>>
>>     public DeduplicationProcessFunction() { }
>>
>>     @Override
>>     public void snapshotState(FunctionSnapshotContext context) throws
>> Exception { }
>>
>>     @Override
>>     public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>>         val descriptor = new ValueStateDescriptor<>("processed",
>>             TypeInformation.of(Object.class));
>>         processedState = context.getKeyedStateStore().getState(descriptor);
>>     }
>>
>>     @Override
>>     public void processElement(IN value, Context ctx, Collector<IN>
>> out) throws Exception {
>>         val processed = processedState.value();
>>         if (processed == null) {
>>             processedState.update(new Object());
>>             out.collect(value);
>>         }
>>     }
>> }
>>
>>
>>
>> Basically, I'm not sure what rocksdb stores in this case. I'm sure
>> that it needs to store key, which is 32byte sha key in this case.
>> What's the value? Is it the 16 bytes that Java requires in-memory? If
>> I'll change my ValueState to integer, and provide additional value
>> there, will it require more storage space? Also, to respond to your
>> point about compression, we're using incremental checkpoints, so I
>> don't think anything will change as per docs. I'm not only interested
>> in snapshot size, but also size of current, in memory and local disk
>> state.
>>
>> Thanks,
>> Maciej
>>
>>
>>
>> wt., 23 lut 2021 o 17:53 Roman Khachatryan <ro...@apache.org> napisał(a):
>> >
>> > Hi Maciej,
>> >
>> > If I understand correctly, you're asking whether ValueState parameterized 
>> > with Object has the same size as the one with Integer (given that the 
>> > actual stored objects (integers) are the same).
>> > With RocksDB, any state object is serialized first and only then it is 
>> > stored in MemTable or in an SST file. So it doesn't matter as long as the 
>> > same serializer is used.
>> >
>> > You probably should try enabling compression if you didn't already: 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski 
>> > <obuchowski.mac...@gmail.com> wrote:
>> >>
>> >> Hey.
>> >>
>> >> We have deduplication job that has a large amount of keyed ValueState. We 
>> >> want to decrease state size as much as possible, so we're using 
>> >> ValueState<Object> as it's smallest possible Java non-primitive. However, 
>> >> as per https://www.baeldung.com/java-size-of-object (and my measurements) 
>> >> Java Integer has the same memory size as Object due to padding.
>> >> Will this still be true with RocksDB state? Can we put Integer in state 
>> >> without increasing state size?
>> >>
>> >> Thanks, Maciej

Reply via email to