Hi Faxian

We also try to fix the single large list state problem in RocksDB and had a 
private solution by adding atomic increased number in the RocksDB's key bytes. 
We would keep the number in the checkpoint so that the order would not be 
broken after restoring from checkpoint.

I think FLINK-8297 mainly focus on resolving large list storage in RocksDB, and 
timestamp is just one solution. Actually I did not get your point why we should 
use record's timestamp.

After we implement the elemnt-wise rocksDB list state in our environment, we 
found this behaves much worse than original list state as expected and not 
recommend users to directly use this feature if they're not sure the list state 
really large.

Best
Yun Tang

________________________________
From: Andrey Zagrebin <and...@ververica.com>
Sent: Monday, April 15, 2019 17:09
To: dev
Subject: Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased 
RocksDBListState

Hi Faxian,

Thanks for thinking on this new approach. Here are my thoughts:

- In case of event time, although, this approach changes semantics of
original list state, it could be a good fit for certain use cases. The main
advantage is that it is deterministic in event time. The list should end up
always in the same order.

- In case of processing time, the time skew might be a problem. If task
executor's clock jumps back for some reason or it fails and another TE with
shifted clock takes over, this can potentially reorder list elements. If we
rather think about the list state as a bag, reordering might be ok but
there is also a risk that different elements might end up having the same
processing time and rewrite each other.

- In general, exploding a storage size is a trade-off to achieve more
scalability for list state and should be ok if we do not degrade existing
approach.

Let's see other opinions.

Best,
Andrey

On Fri, Apr 12, 2019 at 10:34 AM Faxian Zhao <faxianz...@gmail.com> wrote:

> Refer from PR#5185, I think we can use Timebased RocksDBListState to
> resolve it.
> Timebased RocksDBListState store list entries dispersed in rocksdb like
> RocksDBMapState.
> Key pair:
> For the timebased flink inner class like StreamRecord(enable
> event/ingestion time), the rocksdb key is
> #KeyGroup#Key#Namespace#StreamRecord.getTimestamp().
> Otherwise, the key is current procssing time.
> Value pair:
> The rocksdb value is the entries which have the same
> timestamp(event/ingestion/processing time), like the original
> RocksDBListState.
>
> The ListState.get() implement like
> org.apache.flink.contrib.streaming.state.RocksDBMapState#iterator.
> Generally, it won't load all entries one time.
>
> The rocksdb store structure.
> -----------Key------------------- --------------------Value---------
> #KeyGroup#Key#Namespace #KeyGroup#Key#Namespace#ts3 (max lexicographically
> key)
> #KeyGroup#Key#Namespace#ts1value1,value2,value7
> #KeyGroup#Key#Namespace#ts2value4,value6
> #KeyGroup#Key#Namespace#ts3value3,value5
>
>
> Advantage:
> 1. Due to the rocksdb store key with lexicographically order, so the
> entries is monotonous by time. It's friendly to event time records
> processing.
> 2. We can store the max timestamp key in the rocksdb default
> key(#KeyGroup#Key#Namespace), then we can reverse iterate the stored list.
> 3. For the CountEvictor and TimeEvictor, we can stop the iteration early
> instead of read all of them into memory.
> 4. This ListState is monotonous by time, we can provide some more methods
> for event time records processing.
> 5. I think it resolve the ttl issue naturally.
>
> Disadvantage:
> 1. It will add 8 bytes cost to store extended timestamp in key part, and
> I'm not good at rocksdb, I don't know the performance affect.
> 2. For the event time StreamRecord, it will reorder the entries by event
> time. This behavior is not align with other ListState implement.
> 3. For other records, the key is useless useless overhead.
> 4. If all of the entries have the same timestamp, the store structure is
> almost same as the original RocksDBListState.
> 5. We can't easily implement remove, size method for ListState yet.
>
> Implement:
> We can abstract a new class which is the parent of Time based
> RocksDBListState and RocksDBMapState, but we should modify
> InternalLargeListState.
> I draft some code for this in PR#7675
>

Reply via email to