[ 
https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Bessonov updated IGNITE-17990:
-----------------------------------
    Description: 
h3. Description

In first design, full rebalance is implemented this way:
 * we drop partition data
 * we download partition data from the leader
 * we're done

There's a problem with this approach - if download part failed, we lost one 
follower. This is bad, because technically new leader may have more data in the 
log and it could have uploaded it the follower, but now it makes no sense.

Not only can it lead to hard-to-catch errors and introducing custom code to 
JRaft, it's also an unconditional data deletion without neither explicit  user 
approval nor a copy of the data preserved durably.

Such implementation is fine for POC and some tests, but it cannot be allowed in 
the release version of the product.
h3. New proposed solution

As trivial as it may seem, new solution is to _not deleting data_ before 
snapshot is fully downloaded and ready for swap. Why is it trivial? Because 
this is literally what RAFT demands to be done.

Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, 
should be {{O(1)}} when it comes to the number of rows in the partition and a 
number of transactions in a tx state store. This may not be fully achievable, 
depending on the implementation that we chose, more on that later.

Following sections will describe all my concerns and possible implementations. 
Some sections can be skipped while reading. For example, if you're not 
interested in a specific storage engine, but want to read everything else.
h3. TX state storage

There's one really good thing about TX state storage. It has no storage engine, 
there's only a single RocksDB-based implementation. This makes possible the 
following approach:
 * when we stream data, we can write it into a SST file, almost like in 
snapshots of meta-storage ans CMG storages
 * once snapshot is downloaded, we ingest it into a storage

What I like about this solution is that it's very simple. But, there are 
concerns:
 * ingesting leads to implicit data flush. Technically, RocksDB could implement 
a "load" operation that doesn't require that
 * it's not clear whether RocksDB creates a copy of SST file or not. I would 
assume that it does, because the file might be in other folder or on another 
device, for example. Although copying files is fast, it still takes time. Add 
to this a time required for the flush and we see a problem - operation may 
become unnecessarily long

For these reasons, I don't think that such solution should be implemented. The 
point of this description was to show, that I thought about this alternative 
and consciously decided to use another one.

I believe that TX state storage should use the same approach as a RocksDB-based 
partition storage. Its description can be found later in this issue.
h3. MV storage - Test engine

Test uses concurrent skip-list map for MV data and a bunch of other maps for 
indexes. While snapshots is being downloaded, we should insert all data into 
new maps, that have the same structure. In the end, we should have two versions 
of the partition: old and new.

{{onSnapshotLoad}} should just swap all objects. After that, old partition data 
can be cleaned by the garbage collector.

This is a good place to start implementation. I assume that some new API will 
be introduced. I have thoughts about it as well, they are described later in 
*API* section.
h3. MV storage - RocksDB engine

SST-based approach is described in a *TX state storage* section. There I 
describe why I don't think that this is a good solution. Same reasoning can be 
applied here just as effectively. This means that we should write data in the 
same RocksDB instance. This is a little bit tricky.

The reason is that all stored data is merged together, and Columns Families are 
shared between different partitions. This makes it harder to find a place to 
write partition data while old partition data persists. As a reminder and an 
example, let's take a look at how data is stored in row storage:

 
{code:java}
+-------------------+-----+-------------------+
|    Partition 0    | ... |   Partition MAX   |
+-------------------+-----+-------------------+
| Row1 | ... | RowN | ... | Row1 | ... | RowN |
+-------------------+-----+-------------------+{code}
Logically, CF is split into a different "blocks", and each block represents a 
partition. Currently, each partition block is associated with an 2-bytes 
identifier that matches a partition number in Big Endian.

 

We could add new CF with similar structure and write snapshot data in it, but 
then the snapshot load operation would require us to move data from one CF to 
another. The only option that I know of, that can do this, is SST ingestion. 
And I already explained why I don't like it.

This leaves us with the necessity to write data into the same column family. 
Naturally occurring solution is to assign a new identifier to the "new" version 
of partition. This way replacing "old" partition with "new" would be 
implemented by replacing "oldPartId" to "newPartId" in table storage metadata.

Sounds good. No flush is required, snapshot loading becomes pretty fast.

The only thing to keep in mind is that there are multiple column families in 
each partition - row data, hash indexes and a CF for every sorted index.

When "old" partition is deleted, we should probably somehow hint that RocksDB 
should merge some layers and remove a substantial amount of data from disk. But 
such optimization also relates to general partition eviction and is out of 
scope of the discussion.

Last point: what is "oldPartId" and "newPartId"?

As badly as I would want to avoid adding new bytes to keys, we should probably 
replace 2-bytes partition number to a 3-bytes. I propose the following:

 
{code:java}
| MSB | LSB | Generation |{code}
Where MSB is a most significant byte of the partition number, LSB - least 
significant byte, and Generation is a 1-byte counter. Every time we need a 
rebalance, we increase the generation of the current partition.

 

There are alternatives - we either have 256 possible generations (rotating, of 
course) or only 2 (0 and 1). Why is this important?
Every time a partition is "started", it should, technically, perform a cleanup. 
Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we would have to 
cleanup following ranges (lower bound inclusive, upper bound exclusive):
 * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and 
{{(0,23,g+1):(0,24,0)}}
 * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}}

This should be done for all indexes as well. Something tells me that recovery 
will be somewhat faster in the second case, but is it more convenient? I don't 
know.
h3. MV storage - Persistent Page Memory

This engine is way less complicated, but there are tricky parts as well. First 
of all, we can't have new partition ids like in RocksDB engine. It's 2 bytes, 
period. Too much code depends on this size to be exactly two bytes.

Second, unlike RocksDB, each partition is stored in its own set of files. Or, 
in other words, partitions are completely independent, which greatly simplifies 
the swapping procedure. I propose the following algorithm:
 * checkpoint everything before we start, or make sure that everything's 
checkpointed in the partition
 * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and 
close all file descriptors (page-stores)
 * rename all partition files to {{{}*.bak{}}}, for example
 * create new partition and upload data into it
 * when snapshot load is completed, remove bak files {_}on the next 
checkpoint{_}. Revert to bak files otherwise, using the same basic procedure

Local storage recovery should be implemented carefully. I propose having a 
rebalance marker, that shows that rebalance is in progress. On recovery:
 * if marker is present, delete {{bin}} files and rename {{bak}} files to 
{{bin}}
 * if marker is absent, remove {{bak}} files

For this to work, we should always delete marker files strictly before deleting 
{{bak}} files. And do it (I repeat) only when new partition files are 
checkpointed.
h3. MV storage - Volatile Page Memory

In case of a volatile storage, there are no files that can be preserved. 
Instead, we can preserve old partition "meta" - pointers to all trees, free 
lists or anything else useful. After that, partition can be deleted (not 
really) and we start writing data into a new partition. Pretty much like in 
Test storage engine, but offheap.

When we're done, we can start deleting old partition data, using the same 
mechanism that's used in partition eviction (not implemented yet).

> Download RAFT snapshot without deleting original partition data
> ---------------------------------------------------------------
>
>                 Key: IGNITE-17990
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17990
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Priority: Major
>              Labels: ignite-3
>
> h3. Description
> In first design, full rebalance is implemented this way:
>  * we drop partition data
>  * we download partition data from the leader
>  * we're done
> There's a problem with this approach - if download part failed, we lost one 
> follower. This is bad, because technically new leader may have more data in 
> the log and it could have uploaded it the follower, but now it makes no sense.
> Not only can it lead to hard-to-catch errors and introducing custom code to 
> JRaft, it's also an unconditional data deletion without neither explicit  
> user approval nor a copy of the data preserved durably.
> Such implementation is fine for POC and some tests, but it cannot be allowed 
> in the release version of the product.
> h3. New proposed solution
> As trivial as it may seem, new solution is to _not deleting data_ before 
> snapshot is fully downloaded and ready for swap. Why is it trivial? Because 
> this is literally what RAFT demands to be done.
> Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, 
> should be {{O(1)}} when it comes to the number of rows in the partition and a 
> number of transactions in a tx state store. This may not be fully achievable, 
> depending on the implementation that we chose, more on that later.
> Following sections will describe all my concerns and possible 
> implementations. Some sections can be skipped while reading. For example, if 
> you're not interested in a specific storage engine, but want to read 
> everything else.
> h3. TX state storage
> There's one really good thing about TX state storage. It has no storage 
> engine, there's only a single RocksDB-based implementation. This makes 
> possible the following approach:
>  * when we stream data, we can write it into a SST file, almost like in 
> snapshots of meta-storage ans CMG storages
>  * once snapshot is downloaded, we ingest it into a storage
> What I like about this solution is that it's very simple. But, there are 
> concerns:
>  * ingesting leads to implicit data flush. Technically, RocksDB could 
> implement a "load" operation that doesn't require that
>  * it's not clear whether RocksDB creates a copy of SST file or not. I would 
> assume that it does, because the file might be in other folder or on another 
> device, for example. Although copying files is fast, it still takes time. Add 
> to this a time required for the flush and we see a problem - operation may 
> become unnecessarily long
> For these reasons, I don't think that such solution should be implemented. 
> The point of this description was to show, that I thought about this 
> alternative and consciously decided to use another one.
> I believe that TX state storage should use the same approach as a 
> RocksDB-based partition storage. Its description can be found later in this 
> issue.
> h3. MV storage - Test engine
> Test uses concurrent skip-list map for MV data and a bunch of other maps for 
> indexes. While snapshots is being downloaded, we should insert all data into 
> new maps, that have the same structure. In the end, we should have two 
> versions of the partition: old and new.
> {{onSnapshotLoad}} should just swap all objects. After that, old partition 
> data can be cleaned by the garbage collector.
> This is a good place to start implementation. I assume that some new API will 
> be introduced. I have thoughts about it as well, they are described later in 
> *API* section.
> h3. MV storage - RocksDB engine
> SST-based approach is described in a *TX state storage* section. There I 
> describe why I don't think that this is a good solution. Same reasoning can 
> be applied here just as effectively. This means that we should write data in 
> the same RocksDB instance. This is a little bit tricky.
> The reason is that all stored data is merged together, and Columns Families 
> are shared between different partitions. This makes it harder to find a place 
> to write partition data while old partition data persists. As a reminder and 
> an example, let's take a look at how data is stored in row storage:
>  
> {code:java}
> +-------------------+-----+-------------------+
> |    Partition 0    | ... |   Partition MAX   |
> +-------------------+-----+-------------------+
> | Row1 | ... | RowN | ... | Row1 | ... | RowN |
> +-------------------+-----+-------------------+{code}
> Logically, CF is split into a different "blocks", and each block represents a 
> partition. Currently, each partition block is associated with an 2-bytes 
> identifier that matches a partition number in Big Endian.
>  
> We could add new CF with similar structure and write snapshot data in it, but 
> then the snapshot load operation would require us to move data from one CF to 
> another. The only option that I know of, that can do this, is SST ingestion. 
> And I already explained why I don't like it.
> This leaves us with the necessity to write data into the same column family. 
> Naturally occurring solution is to assign a new identifier to the "new" 
> version of partition. This way replacing "old" partition with "new" would be 
> implemented by replacing "oldPartId" to "newPartId" in table storage metadata.
> Sounds good. No flush is required, snapshot loading becomes pretty fast.
> The only thing to keep in mind is that there are multiple column families in 
> each partition - row data, hash indexes and a CF for every sorted index.
> When "old" partition is deleted, we should probably somehow hint that RocksDB 
> should merge some layers and remove a substantial amount of data from disk. 
> But such optimization also relates to general partition eviction and is out 
> of scope of the discussion.
> Last point: what is "oldPartId" and "newPartId"?
> As badly as I would want to avoid adding new bytes to keys, we should 
> probably replace 2-bytes partition number to a 3-bytes. I propose the 
> following:
>  
> {code:java}
> | MSB | LSB | Generation |{code}
> Where MSB is a most significant byte of the partition number, LSB - least 
> significant byte, and Generation is a 1-byte counter. Every time we need a 
> rebalance, we increase the generation of the current partition.
>  
> There are alternatives - we either have 256 possible generations (rotating, 
> of course) or only 2 (0 and 1). Why is this important?
> Every time a partition is "started", it should, technically, perform a 
> cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we 
> would have to cleanup following ranges (lower bound inclusive, upper bound 
> exclusive):
>  * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and 
> {{(0,23,g+1):(0,24,0)}}
>  * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}}
> This should be done for all indexes as well. Something tells me that recovery 
> will be somewhat faster in the second case, but is it more convenient? I 
> don't know.
> h3. MV storage - Persistent Page Memory
> This engine is way less complicated, but there are tricky parts as well. 
> First of all, we can't have new partition ids like in RocksDB engine. It's 2 
> bytes, period. Too much code depends on this size to be exactly two bytes.
> Second, unlike RocksDB, each partition is stored in its own set of files. Or, 
> in other words, partitions are completely independent, which greatly 
> simplifies the swapping procedure. I propose the following algorithm:
>  * checkpoint everything before we start, or make sure that everything's 
> checkpointed in the partition
>  * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and 
> close all file descriptors (page-stores)
>  * rename all partition files to {{{}*.bak{}}}, for example
>  * create new partition and upload data into it
>  * when snapshot load is completed, remove bak files {_}on the next 
> checkpoint{_}. Revert to bak files otherwise, using the same basic procedure
> Local storage recovery should be implemented carefully. I propose having a 
> rebalance marker, that shows that rebalance is in progress. On recovery:
>  * if marker is present, delete {{bin}} files and rename {{bak}} files to 
> {{bin}}
>  * if marker is absent, remove {{bak}} files
> For this to work, we should always delete marker files strictly before 
> deleting {{bak}} files. And do it (I repeat) only when new partition files 
> are checkpointed.
> h3. MV storage - Volatile Page Memory
> In case of a volatile storage, there are no files that can be preserved. 
> Instead, we can preserve old partition "meta" - pointers to all trees, free 
> lists or anything else useful. After that, partition can be deleted (not 
> really) and we start writing data into a new partition. Pretty much like in 
> Test storage engine, but offheap.
> When we're done, we can start deleting old partition data, using the same 
> mechanism that's used in partition eviction (not implemented yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to