Refer from PR#5185, I think we can use Timebased RocksDBListState to
resolve it.
Timebased RocksDBListState store list entries dispersed in rocksdb like
RocksDBMapState.
Key pair:
For the timebased flink inner class like StreamRecord(enable
event/ingestion time), the rocksdb key is
#KeyGroup#Key#Namespace#StreamRecord.getTimestamp().
Otherwise, the key is current procssing time.
Value pair:
The rocksdb value is the entries which have the same
timestamp(event/ingestion/processing time), like the original
RocksDBListState.

The ListState.get() implement like
org.apache.flink.contrib.streaming.state.RocksDBMapState#iterator.
Generally, it won't load all entries one time.

The rocksdb store structure.
-----------Key------------------- --------------------Value---------
#KeyGroup#Key#Namespace #KeyGroup#Key#Namespace#ts3 (max lexicographically
key)
#KeyGroup#Key#Namespace#ts1value1,value2,value7
#KeyGroup#Key#Namespace#ts2value4,value6
#KeyGroup#Key#Namespace#ts3value3,value5


Advantage:
1. Due to the rocksdb store key with lexicographically order, so the
entries is monotonous by time. It's friendly to event time records
processing.
2. We can store the max timestamp key in the rocksdb default
key(#KeyGroup#Key#Namespace), then we can reverse iterate the stored list.
3. For the CountEvictor and TimeEvictor, we can stop the iteration early
instead of read all of them into memory.
4. This ListState is monotonous by time, we can provide some more methods
for event time records processing.
5. I think it resolve the ttl issue naturally.

Disadvantage:
1. It will add 8 bytes cost to store extended timestamp in key part, and
I'm not good at rocksdb, I don't know the performance affect.
2. For the event time StreamRecord, it will reorder the entries by event
time. This behavior is not align with other ListState implement.
3. For other records, the key is useless useless overhead.
4. If all of the entries have the same timestamp, the store structure is
almost same as the original RocksDBListState.
5. We can't easily implement remove, size method for ListState yet.

Implement:
We can abstract a new class which is the parent of Time based
RocksDBListState and RocksDBMapState, but we should modify
InternalLargeListState.
I draft some code for this in PR#7675

Reply via email to