[ https://issues.apache.org/jira/browse/IGNITE-17252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vladislav Pyatkov reassigned IGNITE-17252: ------------------------------------------ Assignee: Vladislav Pyatkov > Introduce Replica, ReplicaServer(?), ReplicaService and ReplicaListener > interfaces > ---------------------------------------------------------------------------------- > > Key: IGNITE-17252 > URL: https://issues.apache.org/jira/browse/IGNITE-17252 > Project: Ignite > Issue Type: Improvement > Reporter: Alexander Lapin > Assignee: Vladislav Pyatkov > Priority: Major > Labels: ignite-3, transaction3_rw > > h2. General context > According to tx design document new abstraction is introduced to encapsulate > replication engine (e.g. Raft) from business logic, called {*}primary > replica{*}: > {code:java} > A primary replica is a replica which serves a special purpose in the > transaction protocol.Only one primary replica can exist at a time. Each > replica is identified by liveness interval (startTs, endTs). All such > intervals are disjoint, so the new primary replica liveness interval can’t > overlap with the previous. Timestamps used for defining the intervals must be > comparable with timestamps assigned to committing transactions. For example, > HLC timestamps can be used for this purpose. > Primary replica is used to execute CC protocol (so all reads and writes go > through it), thus maintaining serializable executions, as described in the > next section. > The simplest implementation would be piggy-backing to RAFT protocol for tying > a primary replica to a RAFT leader. See the leaseholder section from the RAFT > paper for details. For this approach a RAFT leader is identical to a primary > replica node. The endTs is constantly extended using RAFT heart beating. > A primary replica’s status can be voluntarily transferred to another replica. > This is only possible after its liveness interval expires. This can be > useful, for example, for RAFT leaders balancing. {code} > Besides obvious lease-based disjoint replication leader detection, primary > replica is also responsible for handling messages acting as a storage and > replication pre-and-post-processor. It's up to replica to > * acquire, release and await locks > * propagate requests to storage directly > * convert message to an appropriate replication(Raft) command and propagate > it to the replication engine. > Let's check following example: > *As-Is (currently):* > {code:java} > // client-side > InternalTable.upsert() > enlistInTx() > raftService.run(upsertCommand) > raftGroupService.sendWithRetry(ActionRequest.of(upsertCommand)) > messagingService().invoke(actionRequest) > // server-side > ActionRequestProcessor.handleRequest(actionRequest) > future = > JraftServerImpl.DelegatingStateMachine.getListener().onBeforeApply(request.command()); > // Lock management > future.handle(actionRequest.command() instanceof WriteCommand ? > applyWrite(actionRequest) : applyRead(actionRequest)){code} > Please pay attention to *onBeforeApply* step. It was introduced in order to > manage(acquire) locks with further locks awaiting *outside* the raft. It is > critical not to occupy the linearized in-raft execution with such lengthy > operations as waiting for locks to be released. > It worth to mention, that such approach has several disadvantages, e.g. > onBeforeApply step is executed before isLeader() check, so that, it might > acquire lock on non-leader-node that is not the expected behavior. > *To-Be (should be implemented):* > {code:java} > // client-side > InternalTable.upsert() > enlistInTx() > replicaService.invoke(upsertRequest, primary=true) > // server-side > Replica.handleRequest(actionRequest) > if (actionRequest.isPrimaryEvaluationExpected()) > checkLease(); // Return failure if not valid > > if (actionRequest instanceOf WriteRequest) { > // validate writeRequest locally > > // acquire all locks !locally! > fut = txManager.intentWriteLock(table); > > fut.handle(()-> > return > future.of(async(replicationEngine.replicate(ReplicationCommand.of(writeRequest)))) > ) > }{code} > in other word: > * Instead of raftGroupService, replicaService should be used. > * ReplicaService uses messages (actionRequests) instead of raft commands. > * Within the scope of RW transactions replicaService always sends requests > to the *primary* replica, however within the RO transactions non-primary > replicas will also participate in requests handling, so that I believe we > should introduce common Replica instead of strict PrimaryReplica. > * Replica is aware of requests handling logic and process actions in a way > similar to raftGroupListener does, that means that in addition to > RaftGroupListener, ReplicaListener (personally I don't like term Listener > here) will be introduced. > h2. TODO > Within the scope of given ticket interfaces for Replica, ReplicaService and > ReplicaListener should be introduced. Given classes should be sufficient to > replace direct interaction with raft. Please pay attention that from the > service point of view we will need methods for > * Dedicated replica instance communication, e.g. invoke should have > overloaded version with nodeId parameter. Not sure whether it'll be nodeId > itself or peerId or some other replicaId locator. > * Primary replica detection. -- This message was sent by Atlassian Jira (v8.20.10#820010)