[ 
https://issues.apache.org/jira/browse/IGNITE-16907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Bessonov reassigned IGNITE-16907:
--------------------------------------

    Assignee: Ivan Bessonov

> Add ability to use Raft log as storage WAL within the scope of local recovery
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-16907
>                 URL: https://issues.apache.org/jira/browse/IGNITE-16907
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Alexander Lapin
>            Assignee: Ivan Bessonov
>            Priority: Major
>              Labels: ignite-3
>
> h4. Problem
> From the birds eye view raft-to-storage flow looks similar to
>  # 
> {code:java}
> RaftGroupService#run(writeCommand());{code}
>  # Inner raft replication logic, when replicated on majority adjust 
> raft.commitedIndex.
>  # Propagate command to RaftGroupListener (raft state machine).
> {code:java}
> RaftGroupListener#onWrite(closure(writeCommand()));{code}
>  # Within state machine insert data from writeCommand to underneath storage:  
> {code:java}
> var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());{code}
>  # ack that data was applied successfully 
> {code:java}
> clo.result(insertRes);{code}
>  # move raft.appliedIndex to corresponding value, meaning that the data for 
> this index is applied to the state machine.
> The most interesting part, especially for given ticket, relates to step 4.
> In real world storage doesn't flush every mutator on disk, instead it buffers 
> some amount of such mutators and flush them all-together as a part of some 
> checkpointing process. Thus, if node fails before mutatorsBuffer.flush() it 
> might lost some data because raft will apply data starting from appliedIndex 
> + 1 on recovery.
> h4. Possible solutions:
> There are several possibilities to solve this issue:
>  # In-storage WAL. Bad solution, because there's already raft log that can be 
> used as a WAL. Such duplication is redundant.
>  # Local recovery starting from appliedIndex - mutatorsBuffer.size. Bad 
> solution. Won't work for not-idempotent operations. Exposes inner storage 
> details such as mutatorBuffer.size.
>  # proposedIndex propagation + checkpointIndex synchonization. Seems fine. 
> More details below:
>  * First off all, in order to coordinate raft replicator and storage, 
> proposedIndex should be propagated to raftGroupListener and storage.
>  * On every checkpoint, storage will persist corresponding proposed index as 
> checkpointIndex.
>  ** In case of storage inner checkpoints, storage won't notify raft 
> replicator about new checkpointIndex. This kind of notification is an 
> optimization that does not affect the correctness of the protocol.
>  ** In case of outer checkpoint intention, e.g. raft snapshotting for the 
> purposes of raft log truncation, corresponding checkpointIndex will be 
> propagated to raft replicator within a callback "onShapshotDone".
>  * During local recovery raft will apply raft log entries from the very 
> begging. If checkpointIndex occurred to be bigger than proposedIndex on an 
> another raft log entity it fails the proposed closure with 
> IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and 
> optional async raft log truncation.
> Let's consider following example:
> ] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in 
> memory one.
>  # raft.put(k1,v1)
>  ## -> raftlog[cmd(k1,v1, index:1)]
>  ## -> storage[(k1,v1), index:1]
>  ## -> appliedIndex:1
>  # raft.put(k2,v2)
>  ## -> raftlog[cmd(k1,v1, index:1), \\{*}cmd(k2,v2, index:2)\\{*}]
>  ## -> storage[(k1,v1), \\{*}(k2,v2)\\{*}, ** index:\\{*}2\\{*}]
>  ## -> appliedIndex:{*}2{*}
>  # raft.put(k3,v3)
>  ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  \\{*}cmd(k3,v3, 
> index:3)\\{*}]
>  ## -> storage[(k1,v1), (k2,v2), \\{*}(k3,v3)\\{*}, index:\\{*}3\\{*}]
>  ## -> appliedIndex:{*}3{*}
>  ## *inner storage checkpoint*
>  ### raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3)]
>  ### storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3, 
> proposedIndex:3)]
>  ### {*}checkpointedData[(k1,v1),* *(k2,v2),* \\{*}(k3,v3), 
> checkpointIndex:3\\{*}{*}\\{*}{*}]{*}{*}{{*}}
>  # raft.put(k4,v4)
>  ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, 
> index:3), \\{*}cmd(k4,v4, index:4)\\{*}]
>  ## -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\\{*}4\\{*}]
>  ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
>  ## -> appliedIndex:{*}4{*}
>  # Node failure
>  # Node restart
>  ## StorageRecovery: storage.apply(checkpointedData)
>  ## raft-to-storage data application starting from index: 1 // raft doesn't 
> know checkpointedIndex at this point.
>  ### -> storageResponse::IndexMismatchException(3)
>  ####  raft-to-storage data application starting from index: 3 + 1
>  # Recovey result:
>  ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, 
> index:3), cmd(k4,v4, index:4)]
>  ## -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
>  ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
>  ## -> appliedIndex:4
>  # Raft log truncation
>  ## storage.forceCheckpoint
>  ### -> raftlog[index:4]
>  ### -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
>  ### -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
>  ### -> appliedIndex:4



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to