[ https://issues.apache.org/jira/browse/IGNITE-17081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ivan Bessonov updated IGNITE-17081: ----------------------------------- Description: Please refer to https://issues.apache.org/jira/browse/IGNITE-16907 for prerequisites. Please also familiarize yourself with https://issues.apache.org/jira/browse/IGNITE-17077 for better understanding, the description is continued from there. For RocksDB based storage the recovery process is trivial, because RocksDB has its own WAL. So, for testing purposes, it would be enough to just store update index in meta column family. Immediately we have a write amplification issue, on top of possible performance degradation. Obvious solution is inherently bad and needs to be improved. h2. General idea & implementation Obviously, WAL needs to be disabled (WriteOptions#setDisableWAL). This kinda breaks RocksDB recovery procedure, we need to take measures to avoid it. The only feasible way to do so is to use DBOptions#setAtomicFlush in conjunction with org.rocksdb.WriteBatchWithIndex. This allows RocksDB to save all column families consistently, if you have batches that cover several CFs. Basically, {{acquireConsistencyLock()}} would create a thread-local write batch, that's applied on locks release. Most of RocksDbMvPartitionStorage will be affected by this change. NOTE: I believe that scans with unapplied batches should be prohibited for now (gladly, there's a WriteBatchInterface#count() to check). I don't see any practical value and a proper way of implementing it, considering how spread-out in time the scan process is. h2. Callbacks and RAFT snapshots Simply storing and reading update index is easy. Reading committed index is more challenging, I propose caching it and update only from the closure, that can also be used by RAFT to truncate the log. For a closure, there are several things to account for during the implementation: * DBOptions#setListeners. We need two events - ON_FLUSH_BEGIN and ON_FLUSH_COMPLETED. All "completed" events go after all "begin" events in atomic flush mode. And, once you have your first "completed" event ,you have a guarantee that *all* memtables are already persisted. This allows easy tracking of RocksDB flushes, monitoring events alteration is all that's needed. * Unlike PDS implementation, here we will be writing updateIndex value into a memtable every time. This makes it harder to find persistedIndex values for partitions. Gladly, considering the events that we have, during the time between first "completed" and the very next "begin", the state on disk is fully consistent. And there's a way to read data from storage avoiding memtable completely - ReadOptions#setReadTier(PERSISTED_TIER). Summarizing everything from the above, we should implement following protocol: {code:java} During table start: read latest values of update indexes. Store them in an in-memory structure. Set "lastEventType = ON_FLUSH_COMPLETED;". onFlushBegin: if (lastEventType == ON_FLUSH_BEGIN) return; waitForLastAsyncUpdateIndexesRead(); lastEventType = ON_FLUSH_BEGIN; onFlushCompleted: if (lastEventType == ON_FLUSH_COMPLETED) return; asyncReadUpdateIndexesFromDisk(); lastEventType = ON_FLUSH_COMPLETED;{code} Reading values from disk must be performed asynchronously to not stall flushing process. We don't control locks that RocksDb holds while calling listener's methods. That asynchronous process would invoke closures that provide presisted updateIndex values to other components. NODE: One might say that we should call "waitForLastAsyncUpdateIndexesRead();" as late as possible just in case. But my implementation says calling it during the first event. This is fine. I noticed that column families are flushed in order of their internal ids. These ids correspond to a sequence number of CFs, and the "default" CF is always created first. This is the exact CF that we use to store meta. Maybe we're going to change this and create a separate meta CF. Only then we could start optimizing this part, and only if we'll have an actual proof that there's a stall in this exact place. was: Please refer to https://issues.apache.org/jira/browse/IGNITE-16907 for prerequisites. Please also familiarize yourself with https://issues.apache.org/jira/browse/IGNITE-17077 for better understanding, the description is continued from there. For RocksDB based storage the recovery process is trivial, because RocksDB has its own WAL. So, for testing purposes, it would be enough to just store update index in meta column family. Immediately we have a write amplification issue, on top of possible performance degradation. Obvious solution is inherently bad and needs to be improved. h2. General idea & implementation Obviously, WAL needs to be disabled (WriteOptions#setDisableWAL). This kinda breaks RocksDB recovery procedure, we need to take measures to avoid it. The only feasible way to do so is to use DBOptions#setAtomicFlush in conjunction with org.rocksdb.WriteBatchWithIndex. This allows RocksDB to save all column families consistently, if you have batches that cover several CFs. Basically, {{acquireConsistencyLock()}} would create a thread-local write batch, that's applied on locks release. Most of RocksDbMvPartitionStorage will be affected by this change. NOTE: I believe that scans with unapplied batches should be prohibited for now (gladly, there's a WriteBatchInterface#count() to check). I don't see any practical value and a proper way of implementing it, considering how spread-out in time the scan process is. h2. Callbacks and RAFT snapshots Simply storing and reading update index is easy. Reading committed index is more challenging, I propose caching it and update only from the closure, that can also be used by RAFT to truncate the log. For a closure, there are several things to account for during the implementation: * DBOptions#setListeners. We need two events - ON_FLUSH_BEGIN and ON_FLUSH_COMPLETED. All "completed" events go after all "begin" events in atomic flush mode. And, once you have your first "completed" event ,you have a guarantee that *all* memtables are already persisted. This allows easy tracking of RocksDB flushes, monitoring events alteration is all that's needed. * Unlike PDS implementation, here we will be writing updateIndex value into a memtable every time. This makes it harder to find persistedIndex values for partitions. Gladly, considering the events that we have, during the time between first "completed" and the very next "begin", the state on disk is fully consistent. And there's a way to read data from storage avoiding memtable completely - ReadOptions#setReadTier(PERSISTED_TIER). Summarizing everything from the above, we should implement following protocol: {code:java} During table start: read latest values of update indexes. Store them in an in-memory structure. Set "lastEventType = ON_FLUSH_COMPLETED;". onFlushBegin: if (lastEventType == ON_FLUSH_BEGIN) return; waitForLastAsyncUpdateIndexesRead(); lastEventType = ON_FLUSH_BEGIN; onFlushCompleted: if (lastEventType == ON_FLUSH_COMPLETED) return; asyncReadUpdateIndexesFromDisk(); lastEventType = ON_FLUSH_COMPLETED;{code} Reading values from disk must be performed asynchronously to not stall flushing process. We don't control locks that RocksDb holds while calling listener's methods. That asynchronous process would invoke closures that provide presisted updateIndex values to other components. NODE: One might say that we should call "waitForLastAsyncUpdateIndexesRead();" as late as possible just in case. But my implementation says calling it during the first event. This is fine. I noticed that column families are flushed in order of their internal ids. These ids correspond to a sequence number of CFs, and the "default" CF is always created first. This is the exact CF that we use to store meta. Maybe we're going to change this and create a separate meta CF. Only then we could start optimizing this part, and only if we'll have an actual proof that there's a stall in this exact place. > Implement checkpointIndex for RocksDB > ------------------------------------- > > Key: IGNITE-17081 > URL: https://issues.apache.org/jira/browse/IGNITE-17081 > Project: Ignite > Issue Type: Improvement > Reporter: Ivan Bessonov > Priority: Major > Labels: ignite-3 > > Please refer to https://issues.apache.org/jira/browse/IGNITE-16907 for > prerequisites. > Please also familiarize yourself with > https://issues.apache.org/jira/browse/IGNITE-17077 for better understanding, > the description is continued from there. > For RocksDB based storage the recovery process is trivial, because RocksDB > has its own WAL. So, for testing purposes, it would be enough to just store > update index in meta column family. > Immediately we have a write amplification issue, on top of possible > performance degradation. Obvious solution is inherently bad and needs to be > improved. > h2. General idea & implementation > Obviously, WAL needs to be disabled (WriteOptions#setDisableWAL). This kinda > breaks RocksDB recovery procedure, we need to take measures to avoid it. > The only feasible way to do so is to use DBOptions#setAtomicFlush in > conjunction with org.rocksdb.WriteBatchWithIndex. This allows RocksDB to save > all column families consistently, if you have batches that cover several CFs. > Basically, {{acquireConsistencyLock()}} would create a thread-local write > batch, that's applied on locks release. Most of RocksDbMvPartitionStorage > will be affected by this change. > NOTE: I believe that scans with unapplied batches should be prohibited for > now (gladly, there's a WriteBatchInterface#count() to check). I don't see > any practical value and a proper way of implementing it, considering how > spread-out in time the scan process is. > h2. Callbacks and RAFT snapshots > Simply storing and reading update index is easy. Reading committed index is > more challenging, I propose caching it and update only from the closure, that > can also be used by RAFT to truncate the log. > For a closure, there are several things to account for during the > implementation: > * DBOptions#setListeners. We need two events - ON_FLUSH_BEGIN and > ON_FLUSH_COMPLETED. All "completed" events go after all "begin" events in > atomic flush mode. And, once you have your first "completed" event ,you have > a guarantee that *all* memtables are already persisted. > This allows easy tracking of RocksDB flushes, monitoring events alteration is > all that's needed. > * Unlike PDS implementation, here we will be writing updateIndex value into > a memtable every time. This makes it harder to find persistedIndex values for > partitions. Gladly, considering the events that we have, during the time > between first "completed" and the very next "begin", the state on disk is > fully consistent. And there's a way to read data from storage avoiding > memtable completely - ReadOptions#setReadTier(PERSISTED_TIER). > Summarizing everything from the above, we should implement following protocol: > > {code:java} > During table start: read latest values of update indexes. Store them in an > in-memory structure. > Set "lastEventType = ON_FLUSH_COMPLETED;". > onFlushBegin: > if (lastEventType == ON_FLUSH_BEGIN) > return; > waitForLastAsyncUpdateIndexesRead(); > lastEventType = ON_FLUSH_BEGIN; > onFlushCompleted: > if (lastEventType == ON_FLUSH_COMPLETED) > return; > asyncReadUpdateIndexesFromDisk(); > lastEventType = ON_FLUSH_COMPLETED;{code} > Reading values from disk must be performed asynchronously to not stall > flushing process. We don't control locks that RocksDb holds while calling > listener's methods. > That asynchronous process would invoke closures that provide presisted > updateIndex values to other components. > NODE: One might say that we should call > "waitForLastAsyncUpdateIndexesRead();" as late as possible just in case. But > my implementation says calling it during the first event. This is fine. I > noticed that column families are flushed in order of their internal ids. > These ids correspond to a sequence number of CFs, and the "default" CF is > always created first. This is the exact CF that we use to store meta. Maybe > we're going to change this and create a separate meta CF. Only then we could > start optimizing this part, and only if we'll have an actual proof that > there's a stall in this exact place. -- This message was sent by Atlassian Jira (v8.20.7#820007)