[ 
https://issues.apache.org/jira/browse/SOLR-6463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14139076#comment-14139076
 ] 

Renaud Delbru commented on SOLR-6463:
-------------------------------------

Hi Yonik, All,

Here is a proposal for tracking update progress in CDCR. In this scenario, we 
are assuming at the moment the Active-Passive scenario, where there is one 
source cluster forwarding updates to one or more target clusters. Looking 
forward to read your feedbacks on this proposal.

h4. Updates Tracking & Pushing

CDCR replicates data updates from the source to the target Data Center by 
leveraging the Updates Log. A background thread regularly checks the Updates 
Log for new entries, and then forwards them to the target Data Center. The 
thread therefore needs to keep a checkpoint in the form of a pointer to the 
last update successfully processed in the Updates Log. Upon acknowledgement 
from the target Data Center that updates have been successfully processed, the 
Updates Log pointer is updated to reflect the current checkpoint.

This pointer must be synchronized across all the replicas. In the case where 
the leader goes down and a new leader is elected, the new leader will be able 
to resume replication to the last update by using this synchronized pointer. 
The strategy to synchronize such a pointer across replicas will be explained 
next.

If for some reason, the target Data Center is offline or fails to process the 
updates, the thread will periodically try to contact the target Data Center and 
push the updates.

h4. Synchronization of Update Checkpoints

A reliable synchronization of the update checkpoints between the shard leader 
and shard replicas is critical to avoid introducing inconsistency between the 
source and target Data Centers. Another important requirement is that the 
synchronization must be performed with minimal network traffic to maximize 
scalability.

In order to achieve this, the strategy is to:
* Uniquely identify each update operation. This unique identifier will serve as 
pointer. 
* Rely on two storages: an ephemeral storage on the source shard leader, and a 
persistent storage on the target cluster.

The shard leader in the source cluster will be in charge of generating a unique 
identifier for each update operation, and will keep a copy of the identifier of 
the last processed updates in memory. The identifier will be sent to the target 
cluster as part of the update request. On the target Data Center side, the 
shard leader will receive the update request, store it along with the unique 
identifier in the Updates Log, and replicate it to the other shards.

SolrCloud is already providing a unique identifier for each update operation, 
i.e., a “version” number. This version number is generated using a time-based 
lamport clock which is incremented for each update operation sent. This 
provides an “happened-before” ordering of the update operations that will be 
leveraged in (1) the initialisation of the update checkpoint on the source 
cluster, and in (2) the maintenance strategy of the Updates Log.

The persistent storage on the target cluster is used only during the election 
of a new shard leader on the source cluster. If a shard leader goes down on the 
source cluster and a new leader is elected, the new leader will contact the 
target cluster to retrieve the last update checkpoint and instantiate its 
ephemeral pointer. On such a request, the target cluster will retrieve the 
latest identifier received across all the shards, and send it back to the 
source cluster. To retrieve the latest identifier, every shard leader will look 
up the identifier of the first entry in its Update Logs and sent it back to a 
coordinator. The coordinator will have to select the highest among them.

This strategy does not require any additional network traffic and ensures 
reliable pointer synchronization. Consistency is principally achieved by 
leveraging SolrCloud. The update workflow of SolrCloud ensures that every 
update is applied to the leader but also to any of the replicas. If the leader 
goes down, a new leader is elected. During the leader election, a 
synchronization is performed between the new leader and the other replicas. As 
a result, this ensures that the new leader has a consistent Update Logs with 
the previous leader. Having a consistent Updates Log means that:
* On the source cluster, the update checkpoint can be reused by the new leader.
* On the target cluster, the update checkpoint will be consistent between the 
previous and new leader. This ensures the correctness of the update checkpoint 
sent by a newly elected leader on the target cluster.

h6. Impact of Solr’s Update Reordering

The Updates Log can differ between the leader and the replicas, but not in an 
inconsistent way. During leader to replica synchronisation, Solr’s Distributed 
Update Processor will take care of reordering the update operations based on 
their version number, and will drop any operations that are duplicate or could 
cause inconsistency. One of the consequence is that the target cluster can send 
back to the source cluster identifiers that do not exist anymore. However, 
given that the identifier is an incremental version number, the update 
checkpoint on the source cluster can be set to the next existing version number 
without introducing inconsistency.

h6. Replication Between Clusters with Different Topology

The current design can work also in scenarios where replication is performed 
between clusters with a different topology, e.g., one source cluster with two 
shards and a target cluster with four shards. However, there is one limitation 
due to a clock skew (version number) problem across shards.

In such a scenario, a target shard can receive updates from multiple source 
shards (as the document ids will be redistributed across shards due to the 
different cluster topology). This means that the version number generated by 
the source cluster must be global to the cluster in order to keep partial 
ordering of the updates. However, the version number is local to a shard. Given 
that it is likely to have a clock skew across shards, a target shard will 
receive updates with duplicate or non-ordered version numbers. This does not 
really cause problems for add and delete-by-id operations, since the local 
version number replicated to the target cluster will be able to keep partial 
ordering for a given document identifier. However, this causes issues for 
delete-by-query operations:
* When a cluster receives a delete-by-query, it is forwarded to each shard 
leader.
* Each shard leader will assign a version number (which can end up being 
different between shard leaders) to its delete-by-query, and replicate the 
delete-by-query to all the target shard leaders.
* A target shard leader will receive more than one delete-by-query, with 
possibly different version numbers. It will not be possible to duplicate and 
reorder properly these delete-by-query. 

One way to solve the problem of delete-by-query would be to have a clock 
synchronisation procedure when a delete-by-query is received, which would 
happen before the leader forwards the delete-by-query to the other leaders. The 
workflow would look like the following:
* A leader receives a delete-by-query
* This (primary) leader requests a clock synchronisation across the cluster 
(i.e., among the other leaders). The clock is synchronised by using the highest 
version numbers across all the leaders.
* At this stage, the primary leader can assign a version number to the 
delete-by-query, and forwards it to the other leaders.
* The secondary leaders does not overwrite and reuse the version number 
attached to the delete-by-query.

The delete-by-query command will have the same version number across all the 
leaders. When the leaders will replicate the commands to the target data 
center, it then becomes possible to keep the partial ordering, since the source 
leaders have been synchronised and the delete-by-query commands have all the 
same version.

Therefore, the problem boils down on how to implement a clock synchronisation 
procedure. Here is an initial proposal for a future option. Given that a 
synchronisation will be done rarely (only in the case of a delete-by-query), 
performance might be not critical for its implementation. A possible solution 
would be a 2-phase communication approach, where the primary leader will 
initiate the clock synch protocol, and will request the secondary leaders to:
* Block/buffer updates
* Send its latest version number to the primary leader
* Await the answer of the primary leader with the new clock
* Synchronise its clock

It is far from being perfect, as things might become tricky if there are 
network or communication problems, but this is an initial idea to start 
discussion. 

> track update progress
> ---------------------
>
>                 Key: SOLR-6463
>                 URL: https://issues.apache.org/jira/browse/SOLR-6463
>             Project: Solr
>          Issue Type: Sub-task
>            Reporter: Yonik Seeley
>
> Update progress needs to be tracked so that catch-up can be done after a 
> network outage between clusters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org

Reply via email to