[ https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ivan Bessonov updated IGNITE-17990: ----------------------------------- Description: h3. Description In first design, full rebalance is implemented this way: * we drop partition data * we download partition data from the leader * we're done There's a problem with this approach - if download part failed, we lost one follower. This is bad, because technically new leader may have more data in the log and it could have uploaded it the follower, but now it makes no sense. Not only can it lead to hard-to-catch errors and introducing custom code to JRaft, it's also an unconditional data deletion without neither explicit user approval nor a copy of the data preserved durably. Such implementation is fine for POC and some tests, but it cannot be allowed in the release version of the product. h3. New proposed solution As trivial as it may seem, new solution is to _not deleting data_ before snapshot is fully downloaded and ready for swap. Why is it trivial? Because this is literally what RAFT demands to be done. Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, should be {{O(1)}} when it comes to the number of rows in the partition and a number of transactions in a tx state store. This may not be fully achievable, depending on the implementation that we chose, more on that later. Following sections will describe all my concerns and possible implementations. Some sections can be skipped while reading. For example, if you're not interested in a specific storage engine, but want to read everything else. h3. TX state storage There's one really good thing about TX state storage. It has no storage engine, there's only a single RocksDB-based implementation. This makes possible the following approach: * when we stream data, we can write it into a SST file, almost like in snapshots of meta-storage ans CMG storages * once snapshot is downloaded, we ingest it into a storage What I like about this solution is that it's very simple. But, there are concerns: * ingesting leads to additional implicit data flush. Maybe it can be avoided, more on that later * it's not clear whether RocksDB creates a copy of SST file or not. I would assume that it does, because the file might be in other folder or on another device, for example. Although copying files is fast, it still takes time. Add to this a time required for the flush and we see a problem - operation may become unnecessarily long For these reasons, I don't think that such solution should be implemented. The point of this description was to show, that I thought about this alternative and consciously decided to use another one. I believe that TX state storage should use the same approach as a RocksDB-based partition storage. Its description can be found later in this issue. h3. MV storage - Test engine Test uses concurrent skip-list map for MV data and a bunch of other maps for indexes. While snapshots is being downloaded, we should insert all data into new maps, that have the same structure. In the end, we should have two versions of the partition: old and new. {{onSnapshotLoad}} should just swap all objects. After that, old partition data can be cleaned by the garbage collector. This is a good place to start implementation. I assume that some new API will be introduced. I have thoughts about it as well, they are described later in *API* section. h3. MV storage - RocksDB engine SST-based approach is described in a *TX state storage* section. There I describe why I don't think that this is a good solution. Same reasoning can be applied here just as effectively. This means that we should write data in the same RocksDB instance. This is a little bit tricky. The reason is that all stored data is merged together, and Columns Families are shared between different partitions. This makes it harder to find a place to write partition data while old partition data persists. As a reminder and an example, let's take a look at how data is stored in row storage: {code:java} +-------------------+-----+-------------------+ | Partition 0 | ... | Partition MAX | +-------------------+-----+-------------------+ | Row1 | ... | RowN | ... | Row1 | ... | RowN | +-------------------+-----+-------------------+{code} Logically, CF is split into a different "blocks", and each block represents a partition. Currently, each partition block is associated with an 2-bytes identifier that matches a partition number in Big Endian. We could add new CF with similar structure and write snapshot data in it, but then the snapshot load operation would require us to move data from one CF to another. The only option that I know of, that can do this, is SST ingestion. And I already explained why I don't like it. This leaves us with the necessity to write data into the same column family. Naturally occurring solution is to assign a new identifier to the "new" version of partition. This way replacing "old" partition with "new" would be implemented by replacing "oldPartId" to "newPartId" in table storage metadata. Sounds good. No flush is required, snapshot loading becomes pretty fast. The only thing to keep in mind is that there are multiple column families in each partition - row data, hash indexes and a CF for every sorted index. When "old" partition is deleted, we should probably somehow hint that RocksDB should merge some layers and remove a substantial amount of data from disk. But such optimization also relates to general partition eviction and is out of scope of the discussion. Last point: what is "oldPartId" and "newPartId"? As badly as I would want to avoid adding new bytes to keys, we should probably replace 2-bytes partition number to a 3-bytes. I propose the following: {code:java} | MSB | LSB | Generation |{code} Where MSB is a most significant byte of the partition number, LSB - least significant byte, and Generation is a 1-byte counter. Every time we need a rebalance, we increase the generation of the current partition. There are alternatives - we either have 256 possible generations (rotating, of course) or only 2 (0 and 1). Why is this important? Every time a partition is "started", it should, technically, perform a cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we would have to cleanup following ranges (lower bound inclusive, upper bound exclusive): * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and {{(0,23,g+1):(0,24,0)}} * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}} This should be done for all indexes as well. Something tells me that recovery will be somewhat faster in the second case, but is it more convenient? I don't know. Lastly, why is it ok to add a byte to the prefix? Don't we increase a footprint? Yes and no. Right now the engine is not yet properly tuned, but in the future, we may set it up in such a way that RocksDB trims prefixes from the keys, so the size is kinda irrelevant. Will we configure a prefix to be a partition id or a pair (partId, rowId) - I don't know yet. Both options look good, but second may be better. We'll should do both and benchmark them. h3. MV storage - Persistent Page Memory This engine is way less complicated, but there are tricky parts as well. First of all, we can't have new partition ids like in RocksDB engine. It's 2 bytes, period. Too much code depends on this size to be exactly two bytes. Second, unlike RocksDB, each partition is stored in its own set of files. Or, in other words, partitions are completely independent, which greatly simplifies the swapping procedure. I propose the following algorithm: * checkpoint everything before we start, or make sure that everything's checkpointed in the partition * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and close all file descriptors (page-stores) * rename all partition files to {{{}*.bak{}}}, for example * create new partition and upload data into it * when snapshot load is completed, remove bak files {_}on the next checkpoint{_}. Revert to bak files otherwise, using the same basic procedure Local storage recovery should be implemented carefully. I propose having a rebalance marker, that shows that rebalance is in progress. On recovery: * if marker is present, delete {{bin}} files and rename {{bak}} files to {{bin}} * if marker is absent, remove {{bak}} files For this to work, we should always delete marker files strictly before deleting {{bak}} files. And do it (I repeat) only when new partition files are checkpointed. h3. MV storage - Volatile Page Memory In case of a volatile storage, there are no files that can be preserved. Instead, we can preserve old partition "meta" - pointers to all trees, free lists or anything else useful. After that, partition can be deleted (not really) and we start writing data into a new partition. Pretty much like in Test storage engine, but offheap. When we're done, we can start deleting old partition data, using the same mechanism that's used in partition eviction (IGNITE-17833, not implemented yet). If snapshot downloading is interrupted, we return back the old meta and delete everything that we already downloaded asynchronously, again, reusing the partition eviction code. No memory leaks should be left from it. h3. Atomic snapshot load in case of multiple storages On every step, every operation may fail. But data consistency should be preserved no matter what. Here, in particular, we need to call two {{onSnapshotLoad}} methods atomically. Their implementations may be very different. On high level, operation may look like this: * write operation marker somewhere (table folder, vault, doesn't matter). Before doing so, we need to make sure that data is persisted on disk for both storages. Once marker is created, there's no way back. Old partition data will be destroyed * call both methods * remove marker when load is completed Pretty simple. Just do the same thing on recovery, if marker is present. One thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to work. If new partition is already loaded, nothing should break. Loading should effectively become a no-op in such case. h3. API I realize that MnPartitionStorage interface slowly becomes a mess. There's not much that we can do with it. But, rebalance is a good exception to the rule. Basically, we can implement something like this: {code:java} void performLocalRecovery(); MvRebalanceDataWriter startDataRebalancing(); interface MvRebalanceDataWriter { CompletableFuture<?> beforeWritingData(); void addCommitted(...); void addUncommitted(...); // Doesn't return the old value, because that's actually pointless during rebalancing // lastAppliedIndex, runConsistently, etc. CompletableFuture<?> afterWritingData(); void close(); }{code} What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm not sure at the moment. I just hope that this brief description gives you a gist of what I would like to see in the code. I do believe that it will simplify the API at least a little bit. What about indexes? That's a good question. I would expect that old objects, created with {{{}getOrCreate*Index{}}}, should still be functional. It may be a pain in the rear, we may have to introduce default implementation with changeable delegates. It's hard to predict exactly, but this is definitely a part that also requires attention. h3. Conclusion I know that this is a pretty big task. I don't expect it to be done in one sitting. It should be split to 3 issues at least. Probably more. This is fine, just don't forget the link to this particular issue, because it has the overall description of what's going on. was: h3. Description In first design, full rebalance is implemented this way: * we drop partition data * we download partition data from the leader * we're done There's a problem with this approach - if download part failed, we lost one follower. This is bad, because technically new leader may have more data in the log and it could have uploaded it the follower, but now it makes no sense. Not only can it lead to hard-to-catch errors and introducing custom code to JRaft, it's also an unconditional data deletion without neither explicit user approval nor a copy of the data preserved durably. Such implementation is fine for POC and some tests, but it cannot be allowed in the release version of the product. h3. New proposed solution As trivial as it may seem, new solution is to _not deleting data_ before snapshot is fully downloaded and ready for swap. Why is it trivial? Because this is literally what RAFT demands to be done. Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, should be {{O(1)}} when it comes to the number of rows in the partition and a number of transactions in a tx state store. This may not be fully achievable, depending on the implementation that we chose, more on that later. Following sections will describe all my concerns and possible implementations. Some sections can be skipped while reading. For example, if you're not interested in a specific storage engine, but want to read everything else. h3. TX state storage There's one really good thing about TX state storage. It has no storage engine, there's only a single RocksDB-based implementation. This makes possible the following approach: * when we stream data, we can write it into a SST file, almost like in snapshots of meta-storage ans CMG storages * once snapshot is downloaded, we ingest it into a storage What I like about this solution is that it's very simple. But, there are concerns: * ingesting leads to additional implicit data flush. Maybe it can be avoided, more on that later * it's not clear whether RocksDB creates a copy of SST file or not. I would assume that it does, because the file might be in other folder or on another device, for example. Although copying files is fast, it still takes time. Add to this a time required for the flush and we see a problem - operation may become unnecessarily long For these reasons, I don't think that such solution should be implemented. The point of this description was to show, that I thought about this alternative and consciously decided to use another one. I believe that TX state storage should use the same approach as a RocksDB-based partition storage. Its description can be found later in this issue. h3. MV storage - Test engine Test uses concurrent skip-list map for MV data and a bunch of other maps for indexes. While snapshots is being downloaded, we should insert all data into new maps, that have the same structure. In the end, we should have two versions of the partition: old and new. {{onSnapshotLoad}} should just swap all objects. After that, old partition data can be cleaned by the garbage collector. This is a good place to start implementation. I assume that some new API will be introduced. I have thoughts about it as well, they are described later in *API* section. h3. MV storage - RocksDB engine SST-based approach is described in a *TX state storage* section. There I describe why I don't think that this is a good solution. Same reasoning can be applied here just as effectively. This means that we should write data in the same RocksDB instance. This is a little bit tricky. The reason is that all stored data is merged together, and Columns Families are shared between different partitions. This makes it harder to find a place to write partition data while old partition data persists. As a reminder and an example, let's take a look at how data is stored in row storage: {code:java} +-------------------+-----+-------------------+ | Partition 0 | ... | Partition MAX | +-------------------+-----+-------------------+ | Row1 | ... | RowN | ... | Row1 | ... | RowN | +-------------------+-----+-------------------+{code} Logically, CF is split into a different "blocks", and each block represents a partition. Currently, each partition block is associated with an 2-bytes identifier that matches a partition number in Big Endian. We could add new CF with similar structure and write snapshot data in it, but then the snapshot load operation would require us to move data from one CF to another. The only option that I know of, that can do this, is SST ingestion. And I already explained why I don't like it. This leaves us with the necessity to write data into the same column family. Naturally occurring solution is to assign a new identifier to the "new" version of partition. This way replacing "old" partition with "new" would be implemented by replacing "oldPartId" to "newPartId" in table storage metadata. Sounds good. No flush is required, snapshot loading becomes pretty fast. The only thing to keep in mind is that there are multiple column families in each partition - row data, hash indexes and a CF for every sorted index. When "old" partition is deleted, we should probably somehow hint that RocksDB should merge some layers and remove a substantial amount of data from disk. But such optimization also relates to general partition eviction and is out of scope of the discussion. Last point: what is "oldPartId" and "newPartId"? As badly as I would want to avoid adding new bytes to keys, we should probably replace 2-bytes partition number to a 3-bytes. I propose the following: {code:java} | MSB | LSB | Generation |{code} Where MSB is a most significant byte of the partition number, LSB - least significant byte, and Generation is a 1-byte counter. Every time we need a rebalance, we increase the generation of the current partition. There are alternatives - we either have 256 possible generations (rotating, of course) or only 2 (0 and 1). Why is this important? Every time a partition is "started", it should, technically, perform a cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we would have to cleanup following ranges (lower bound inclusive, upper bound exclusive): * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and {{(0,23,g+1):(0,24,0)}} * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}} This should be done for all indexes as well. Something tells me that recovery will be somewhat faster in the second case, but is it more convenient? I don't know. h3. MV storage - Persistent Page Memory This engine is way less complicated, but there are tricky parts as well. First of all, we can't have new partition ids like in RocksDB engine. It's 2 bytes, period. Too much code depends on this size to be exactly two bytes. Second, unlike RocksDB, each partition is stored in its own set of files. Or, in other words, partitions are completely independent, which greatly simplifies the swapping procedure. I propose the following algorithm: * checkpoint everything before we start, or make sure that everything's checkpointed in the partition * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and close all file descriptors (page-stores) * rename all partition files to {{{}*.bak{}}}, for example * create new partition and upload data into it * when snapshot load is completed, remove bak files {_}on the next checkpoint{_}. Revert to bak files otherwise, using the same basic procedure Local storage recovery should be implemented carefully. I propose having a rebalance marker, that shows that rebalance is in progress. On recovery: * if marker is present, delete {{bin}} files and rename {{bak}} files to {{bin}} * if marker is absent, remove {{bak}} files For this to work, we should always delete marker files strictly before deleting {{bak}} files. And do it (I repeat) only when new partition files are checkpointed. h3. MV storage - Volatile Page Memory In case of a volatile storage, there are no files that can be preserved. Instead, we can preserve old partition "meta" - pointers to all trees, free lists or anything else useful. After that, partition can be deleted (not really) and we start writing data into a new partition. Pretty much like in Test storage engine, but offheap. When we're done, we can start deleting old partition data, using the same mechanism that's used in partition eviction (IGNITE-17833, not implemented yet). If snapshot downloading is interrupted, we return back the old meta and delete everything that we already downloaded asynchronously, again, reusing the partition eviction code. No memory leaks should be left from it. h3. Atomic snapshot load in case of multiple storages On every step, every operation may fail. But data consistency should be preserved no matter what. Here, in particular, we need to call two {{onSnapshotLoad}} methods atomically. Their implementations may be very different. On high level, operation may look like this: * write operation marker somewhere (table folder, vault, doesn't matter). Before doing so, we need to make sure that data is persisted on disk for both storages. Once marker is created, there's no way back. Old partition data will be destroyed * call both methods * remove marker when load is completed Pretty simple. Just do the same thing on recovery, if marker is present. One thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to work. If new partition is already loaded, nothing should break. Loading should effectively become a no-op in such case. h3. API I realize that MnPartitionStorage interface slowly becomes a mess. There's not much that we can do with it. But, rebalance is a good exception to the rule. Basically, we can implement something like this: {code:java} void performLocalRecovery(); MvRebalanceDataWriter startDataRebalancing(); interface MvRebalanceDataWriter { CompletableFuture<?> beforeWritingData(); void addCommitted(...); void addUncommitted(...); // Doesn't return the old value, because that's actually pointless during rebalancing // lastAppliedIndex, runConsistently, etc. CompletableFuture<?> afterWritingData(); void close(); }{code} What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm not sure at the moment. I just hope that this brief description gives you a gist of what I would like to see in the code. I do believe that it will simplify the API at least a little bit. What about indexes? That's a good question. I would expect that old objects, created with {{{}getOrCreate*Index{}}}, should still be functional. It may be a pain in the rear, we may have to introduce default implementation with changeable delegates. It's hard to predict exactly, but this is definitely a part that also requires attention. h3. Conclusion I know that this is a pretty big task. I don't expect it to be done in one sitting. It should be split to 3 issues at least. Probably more. This is fine, just don't forget the link to this particular issue, because it has the overall description of what's going on. > Download RAFT snapshot without deleting original partition data > --------------------------------------------------------------- > > Key: IGNITE-17990 > URL: https://issues.apache.org/jira/browse/IGNITE-17990 > Project: Ignite > Issue Type: Improvement > Reporter: Ivan Bessonov > Priority: Major > Labels: ignite-3 > > h3. Description > In first design, full rebalance is implemented this way: > * we drop partition data > * we download partition data from the leader > * we're done > There's a problem with this approach - if download part failed, we lost one > follower. This is bad, because technically new leader may have more data in > the log and it could have uploaded it the follower, but now it makes no sense. > Not only can it lead to hard-to-catch errors and introducing custom code to > JRaft, it's also an unconditional data deletion without neither explicit > user approval nor a copy of the data preserved durably. > Such implementation is fine for POC and some tests, but it cannot be allowed > in the release version of the product. > h3. New proposed solution > As trivial as it may seem, new solution is to _not deleting data_ before > snapshot is fully downloaded and ready for swap. Why is it trivial? Because > this is literally what RAFT demands to be done. > Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, > should be {{O(1)}} when it comes to the number of rows in the partition and a > number of transactions in a tx state store. This may not be fully achievable, > depending on the implementation that we chose, more on that later. > Following sections will describe all my concerns and possible > implementations. Some sections can be skipped while reading. For example, if > you're not interested in a specific storage engine, but want to read > everything else. > h3. TX state storage > There's one really good thing about TX state storage. It has no storage > engine, there's only a single RocksDB-based implementation. This makes > possible the following approach: > * when we stream data, we can write it into a SST file, almost like in > snapshots of meta-storage ans CMG storages > * once snapshot is downloaded, we ingest it into a storage > What I like about this solution is that it's very simple. But, there are > concerns: > * ingesting leads to additional implicit data flush. Maybe it can be > avoided, more on that later > * it's not clear whether RocksDB creates a copy of SST file or not. I would > assume that it does, because the file might be in other folder or on another > device, for example. Although copying files is fast, it still takes time. Add > to this a time required for the flush and we see a problem - operation may > become unnecessarily long > For these reasons, I don't think that such solution should be implemented. > The point of this description was to show, that I thought about this > alternative and consciously decided to use another one. > I believe that TX state storage should use the same approach as a > RocksDB-based partition storage. Its description can be found later in this > issue. > h3. MV storage - Test engine > Test uses concurrent skip-list map for MV data and a bunch of other maps for > indexes. While snapshots is being downloaded, we should insert all data into > new maps, that have the same structure. In the end, we should have two > versions of the partition: old and new. > {{onSnapshotLoad}} should just swap all objects. After that, old partition > data can be cleaned by the garbage collector. > This is a good place to start implementation. I assume that some new API will > be introduced. I have thoughts about it as well, they are described later in > *API* section. > h3. MV storage - RocksDB engine > SST-based approach is described in a *TX state storage* section. There I > describe why I don't think that this is a good solution. Same reasoning can > be applied here just as effectively. This means that we should write data in > the same RocksDB instance. This is a little bit tricky. > The reason is that all stored data is merged together, and Columns Families > are shared between different partitions. This makes it harder to find a place > to write partition data while old partition data persists. As a reminder and > an example, let's take a look at how data is stored in row storage: > {code:java} > +-------------------+-----+-------------------+ > | Partition 0 | ... | Partition MAX | > +-------------------+-----+-------------------+ > | Row1 | ... | RowN | ... | Row1 | ... | RowN | > +-------------------+-----+-------------------+{code} > Logically, CF is split into a different "blocks", and each block represents a > partition. Currently, each partition block is associated with an 2-bytes > identifier that matches a partition number in Big Endian. > > We could add new CF with similar structure and write snapshot data in it, but > then the snapshot load operation would require us to move data from one CF to > another. The only option that I know of, that can do this, is SST ingestion. > And I already explained why I don't like it. > This leaves us with the necessity to write data into the same column family. > Naturally occurring solution is to assign a new identifier to the "new" > version of partition. This way replacing "old" partition with "new" would be > implemented by replacing "oldPartId" to "newPartId" in table storage metadata. > Sounds good. No flush is required, snapshot loading becomes pretty fast. > The only thing to keep in mind is that there are multiple column families in > each partition - row data, hash indexes and a CF for every sorted index. > When "old" partition is deleted, we should probably somehow hint that RocksDB > should merge some layers and remove a substantial amount of data from disk. > But such optimization also relates to general partition eviction and is out > of scope of the discussion. > Last point: what is "oldPartId" and "newPartId"? > As badly as I would want to avoid adding new bytes to keys, we should > probably replace 2-bytes partition number to a 3-bytes. I propose the > following: > {code:java} > | MSB | LSB | Generation |{code} > Where MSB is a most significant byte of the partition number, LSB - least > significant byte, and Generation is a 1-byte counter. Every time we need a > rebalance, we increase the generation of the current partition. > > There are alternatives - we either have 256 possible generations (rotating, > of course) or only 2 (0 and 1). Why is this important? > Every time a partition is "started", it should, technically, perform a > cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we > would have to cleanup following ranges (lower bound inclusive, upper bound > exclusive): > * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and > {{(0,23,g+1):(0,24,0)}} > * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}} > This should be done for all indexes as well. Something tells me that recovery > will be somewhat faster in the second case, but is it more convenient? I > don't know. > Lastly, why is it ok to add a byte to the prefix? Don't we increase a > footprint? Yes and no. Right now the engine is not yet properly tuned, but in > the future, we may set it up in such a way that RocksDB trims prefixes from > the keys, so the size is kinda irrelevant. Will we configure a prefix to be a > partition id or a pair (partId, rowId) - I don't know yet. Both options look > good, but second may be better. We'll should do both and benchmark them. > h3. MV storage - Persistent Page Memory > This engine is way less complicated, but there are tricky parts as well. > First of all, we can't have new partition ids like in RocksDB engine. It's 2 > bytes, period. Too much code depends on this size to be exactly two bytes. > Second, unlike RocksDB, each partition is stored in its own set of files. Or, > in other words, partitions are completely independent, which greatly > simplifies the swapping procedure. I propose the following algorithm: > * checkpoint everything before we start, or make sure that everything's > checkpointed in the partition > * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and > close all file descriptors (page-stores) > * rename all partition files to {{{}*.bak{}}}, for example > * create new partition and upload data into it > * when snapshot load is completed, remove bak files {_}on the next > checkpoint{_}. Revert to bak files otherwise, using the same basic procedure > Local storage recovery should be implemented carefully. I propose having a > rebalance marker, that shows that rebalance is in progress. On recovery: > * if marker is present, delete {{bin}} files and rename {{bak}} files to > {{bin}} > * if marker is absent, remove {{bak}} files > For this to work, we should always delete marker files strictly before > deleting {{bak}} files. And do it (I repeat) only when new partition files > are checkpointed. > h3. MV storage - Volatile Page Memory > In case of a volatile storage, there are no files that can be preserved. > Instead, we can preserve old partition "meta" - pointers to all trees, free > lists or anything else useful. After that, partition can be deleted (not > really) and we start writing data into a new partition. Pretty much like in > Test storage engine, but offheap. > When we're done, we can start deleting old partition data, using the same > mechanism that's used in partition eviction (IGNITE-17833, not implemented > yet). > If snapshot downloading is interrupted, we return back the old meta and > delete everything that we already downloaded asynchronously, again, reusing > the partition eviction code. No memory leaks should be left from it. > h3. Atomic snapshot load in case of multiple storages > On every step, every operation may fail. But data consistency should be > preserved no matter what. Here, in particular, we need to call two > {{onSnapshotLoad}} methods atomically. Their implementations may be very > different. > On high level, operation may look like this: > * write operation marker somewhere (table folder, vault, doesn't matter). > Before doing so, we need to make sure that data is persisted on disk for both > storages. Once marker is created, there's no way back. Old partition data > will be destroyed > * call both methods > * remove marker when load is completed > Pretty simple. Just do the same thing on recovery, if marker is present. One > thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to > work. If new partition is already loaded, nothing should break. Loading > should effectively become a no-op in such case. > h3. API > I realize that MnPartitionStorage interface slowly becomes a mess. There's > not much that we can do with it. But, rebalance is a good exception to the > rule. > Basically, we can implement something like this: > {code:java} > void performLocalRecovery(); > MvRebalanceDataWriter startDataRebalancing(); > interface MvRebalanceDataWriter { > CompletableFuture<?> beforeWritingData(); > void addCommitted(...); > void addUncommitted(...); // Doesn't return the old value, because that's > actually pointless during rebalancing > // lastAppliedIndex, runConsistently, etc. > CompletableFuture<?> afterWritingData(); > void close(); > }{code} > What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm > not sure at the moment. I just hope that this brief description gives you a > gist of what I would like to see in the code. I do believe that it will > simplify the API at least a little bit. > What about indexes? That's a good question. I would expect that old objects, > created with {{{}getOrCreate*Index{}}}, should still be functional. It may be > a pain in the rear, we may have to introduce default implementation with > changeable delegates. It's hard to predict exactly, but this is definitely a > part that also requires attention. > h3. Conclusion > I know that this is a pretty big task. I don't expect it to be done in one > sitting. It should be split to 3 issues at least. Probably more. > This is fine, just don't forget the link to this particular issue, because it > has the overall description of what's going on. -- This message was sent by Atlassian Jira (v8.20.10#820010)