[ https://issues.apache.org/jira/browse/IGNITE-16907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ivan Bessonov reassigned IGNITE-16907: -------------------------------------- Assignee: Ivan Bessonov > Add ability to use Raft log as storage WAL within the scope of local recovery > ----------------------------------------------------------------------------- > > Key: IGNITE-16907 > URL: https://issues.apache.org/jira/browse/IGNITE-16907 > Project: Ignite > Issue Type: Improvement > Reporter: Alexander Lapin > Assignee: Ivan Bessonov > Priority: Major > Labels: ignite-3 > > h4. Problem > From the birds eye view raft-to-storage flow looks similar to > # > {code:java} > RaftGroupService#run(writeCommand());{code} > # Inner raft replication logic, when replicated on majority adjust > raft.commitedIndex. > # Propagate command to RaftGroupListener (raft state machine). > {code:java} > RaftGroupListener#onWrite(closure(writeCommand()));{code} > # Within state machine insert data from writeCommand to underneath storage: > {code:java} > var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());{code} > # ack that data was applied successfully > {code:java} > clo.result(insertRes);{code} > # move raft.appliedIndex to corresponding value, meaning that the data for > this index is applied to the state machine. > The most interesting part, especially for given ticket, relates to step 4. > In real world storage doesn't flush every mutator on disk, instead it buffers > some amount of such mutators and flush them all-together as a part of some > checkpointing process. Thus, if node fails before mutatorsBuffer.flush() it > might lost some data because raft will apply data starting from appliedIndex > + 1 on recovery. > h4. Possible solutions: > There are several possibilities to solve this issue: > # In-storage WAL. Bad solution, because there's already raft log that can be > used as a WAL. Such duplication is redundant. > # Local recovery starting from appliedIndex - mutatorsBuffer.size. Bad > solution. Won't work for not-idempotent operations. Exposes inner storage > details such as mutatorBuffer.size. > # proposedIndex propagation + checkpointIndex synchonization. Seems fine. > More details below: > * First off all, in order to coordinate raft replicator and storage, > proposedIndex should be propagated to raftGroupListener and storage. > * On every checkpoint, storage will persist corresponding proposed index as > checkpointIndex. > ** In case of storage inner checkpoints, storage won't notify raft > replicator about new checkpointIndex. This kind of notification is an > optimization that does not affect the correctness of the protocol. > ** In case of outer checkpoint intention, e.g. raft snapshotting for the > purposes of raft log truncation, corresponding checkpointIndex will be > propagated to raft replicator within a callback "onShapshotDone". > * During local recovery raft will apply raft log entries from the very > begging. If checkpointIndex occurred to be bigger than proposedIndex on an > another raft log entity it fails the proposed closure with > IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and > optional async raft log truncation. > Let's consider following example: > ] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in > memory one. > # raft.put(k1,v1) > ## -> raftlog[cmd(k1,v1, index:1)] > ## -> storage[(k1,v1), index:1] > ## -> appliedIndex:1 > # raft.put(k2,v2) > ## -> raftlog[cmd(k1,v1, index:1), \\{*}cmd(k2,v2, index:2)\\{*}] > ## -> storage[(k1,v1), \\{*}(k2,v2)\\{*}, ** index:\\{*}2\\{*}] > ## -> appliedIndex:{*}2{*} > # raft.put(k3,v3) > ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), \\{*}cmd(k3,v3, > index:3)\\{*}] > ## -> storage[(k1,v1), (k2,v2), \\{*}(k3,v3)\\{*}, index:\\{*}3\\{*}] > ## -> appliedIndex:{*}3{*} > ## *inner storage checkpoint* > ### raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3)] > ### storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3, > proposedIndex:3)] > ### {*}checkpointedData[(k1,v1),* *(k2,v2),* \\{*}(k3,v3), > checkpointIndex:3\\{*}{*}\\{*}{*}]{*}{*}{{*}} > # raft.put(k4,v4) > ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, > index:3), \\{*}cmd(k4,v4, index:4)\\{*}] > ## -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\\{*}4\\{*}] > ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3] > ## -> appliedIndex:{*}4{*} > # Node failure > # Node restart > ## StorageRecovery: storage.apply(checkpointedData) > ## raft-to-storage data application starting from index: 1 // raft doesn't > know checkpointedIndex at this point. > ### -> storageResponse::IndexMismatchException(3) > #### raft-to-storage data application starting from index: 3 + 1 > # Recovey result: > ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, > index:3), cmd(k4,v4, index:4)] > ## -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4] > ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3] > ## -> appliedIndex:4 > # Raft log truncation > ## storage.forceCheckpoint > ### -> raftlog[index:4] > ### -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4] > ### -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4] > ### -> appliedIndex:4 -- This message was sent by Atlassian Jira (v8.20.10#820010)