[ 
https://issues.apache.org/jira/browse/SOLR-14339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bilal Waheed updated SOLR-14339:
--------------------------------
    Description: 
*Problem*

The source of truth for a SHARED replica lives in the shared store. At any give 
point in time, locally a SHARED replica can be missing some or all of its 
shard's indexing. The amount of indexing that is missing from a SHARED replica 
is arbitrary. This is even true for a leader replica because we do not update 
SHARED replica from shared store at startup/recovery/leader-election etc. All 
this means that a query served by SHARED replica can be blind to arbitrary 
amount of indexing.

There are few requirements we would like to support.
 # Query visibility into whatever have been indexed so far is not stale beyond 
some well defined SLA.
 # A client can issue a Solr query that has complete visibility into whatever 
has been indexed so far. This is primarily needed for tests.
 # Solr query results has information about the freshness of index used to 
serve the query.
 # Scalability: We would like to eventually support high number (~10K) of 
shared store based collections per Solr cluster. With understanding that a 
large majority (~80%) will not be used most of the time. The ones that will be 
used will also go through some daily usage cycle (less usage at night time). In 
other words, we do not want to proactively refresh SHARED replica from shared 
store until unless it is needed for querying or indexing. 

For indexing we already have things covered where we synchronously ensures that 
the leader replica is up to date with shared store before proceeding with new 
indexing. But we need a solution for queries.

*Proposal*

Two duration configuration values called *freshnessTolerance* and 
*refreshInterval* are introduced. Both are defined in units of seconds. Each 
SHARED replica tracks (in JVM memory) the last refresh time (LRT) it was 
brought to be up to date with the index data in the shared store (last time it 
read metadataSuffix^[1]^ from Zookeeper to evaluate its freshness, followed by 
a pull from shared store when needed). The time immediately before reading the 
metadataSuffix becomes the LRT. If there is nothing to pull then we simply 
update replica’s LRT. But if there is something to pull then LRT is updated 
only after the pull is successfully finished. Leader will also update its LRT 
after successful indexing batch. In successful indexing case the time just 
before the update of metadataSuffix in ZK becomes the LRT.

A replica with LRT less than equal to *freshnessTolerance* duration ago (when 
compared to query arrival time) is allowed to serve a query. But if the LRT is 
more than freshnessTolerance duration ago then we will check if there is any 
new indexing to pull or not. If there is none we will update the LRT of the 
replica and serve the query. If there is new indexing to pull then we will 
enqueue a pull from shared store and fail the query immediately by throwing 
"*Replica is not fresh enough to serve the query*" error.

SolrCloud already has a logic where if a replica is failed to serve the query 
then it will try the other available replicas of the shard. If none of the 
replicas of a shard are fresh enough to serve the query then Solr will return 
*NoReplicaIsFreshEnough* error.

The *refreshInterval* config denotes a shorter duration than freshnessTolerance 
config. A replica getting a continuous stream of queries will at least refresh 
itself every refreshInterval duration. 

Clients will have an option to *override freshnessTolerance on per query basis* 
i.e. it can be passed as a parameter to query. For immediate readability (serve 
the query with whatever has been indexed so far) client will pass 0 for 
freshnessTolerance.

If a replica serves the query it will return the *timeSinceLastRefresh* 
duration of the replica (in milliseconds) in query response. If in query fanout 
there are multiple shards(replicas) involved then the timeSinceLastRefresh 
duration of the query will be maximum of timeSinceLastRefresh of all the 
replicas that contributed to the query. Because of clock skewness issues 
between client and Solr, timestamp of the last refresh does not carry much 
meaning therefore we are returning the length of time since the last refresh.

[1] *metadataSuffix*: One collection can be made up of multiple shards and each 
shard can have multiple replicas. For each shard only one replica will be 
acting as a leader and taking indexing and pushing to shared store. Failures 
can happen and we can have old leader and new leader working at the same time. 
To keep index correct we write a unique string in zookeeper at the end of each 
successful indexing batch.  We call that unique string metadataSuffix and it is 
a per-shard metadata. It is not important to understand how it is used but that 
it exists and for each shard this is the source of truth for last successful 
indexing batch. Each replica has a JVM cache telling on what metadataSuffix 
version of the index it is at.

*Pseudo code*

 
{code:java}
// A SolrQueryRequest q has arrived for replica r of shard s.

// Use of query arrival time is conservative. If we somehow has the ability
// to subtract network time from this, we will still be correct in our logic.
long queryArrivalTime = q.getStartTime();
long lastRefreshTime = getLastRefreshTime(r);
Long freshnessToleranceParameter = getFreshnessToleranceParameter(q);
long acceptableFreshnessTolerance = freshnessToleranceParameter != null ?
                    freshnessToleranceParameter : getFreshnessTolerance();
acceptableFreshnessTolerance = TimeUnit.MILLISECONDS.convert(
                    acceptableFreshnessTolerance, TimeUnit.SECONDS);
long minimumAcceptableLastRefreshTime = queryArrivalTime - 
                    acceptableFreshnessTolerance;
if (lastRefreshTime >= minimumAcceptableLastRefreshTime) {
  long refreshInterval = TimeUnit.MILLISECONDS.convert(getRefreshInterval(), 
                TimeUnit.SECONDS);
  if ((lastRefreshTime + refreshInterval) < System.currentTimeMillis()) { 
    enqueuePullFromSharedStore(r);
  }
} else {
  String metadataSuffixInCache = getMetadataSuffixFromCache(r);
  long newLastRefreshTime = System.currentTimeMillis();
  String metadataSuffixInZK = getMetadataSuffixFromZK(s);
  if (metadataSuffixInCache.equals(metadataSuffixInZK)) {
    // No new indexing since the last refresh.
    // we can just update our last refresh time and we will be fresh enough to
    // serve the query        
    updateLastRefreshTime(r, newLastRefreshTime);
  } else {
    enqueuePullFromSharedStore(r);
    throw new SolrException(ErrorCode.SERVER_ERROR, "Replica is not fresh   
                                   enough to serve the query");
  }
}

// serve the query...
{code}
 

 

*Minimizing NoReplicaIsFreshEnough errors*

Some steady state scenarios this proposal will cover without failing the query 
with NoReplicaIsFreshEnough error.
 * A replica getting continuous query stream will keep serving them.
 * Query stream is not continuous and there is no query activity for 
freshnessTolerance duration.
 ** If there is no new indexing since the last query then any replica that has 
already been queried before will be able to server the query.
 ** If there is new indexing then only the leader replica will be able to 
server the query (assuming leader has not failed after indexing)

We will still have the scenarios where we will run into NoReplicaIsFreshEnough 
error.
 * Cold start, shard was not active (no indexing, no querying).
 * Deployments doing a rolling restart of all the replicas.
 * There was no query activity for freshnessTolerance duration of time and then 
there was some indexing. After indexing leader died.

Minimizing NoReplicaIsFreshEnough errors is out of the scope for this Jira. 
There will be a separate Jira on that.

  was:
*Problem*

The source of truth for a SHARED replica lives in the shared store. At any give 
point in time, locally a SHARED replica can be missing some or all of its 
shard's indexing. The amount of indexing that is missing from a SHARED replica 
is arbitrary. This is even true for a leader replica because we do not update 
SHARED replica from shared store at startup/recovery/leader-election etc. All 
this means that a query served by SHARED replica can be blind to arbitrary 
amount of indexing.

There are few requirements we would like to support.
 # Query visibility into whatever have been indexed so far is not stale beyond 
some well defined SLA.
 # A client can issue a Solr query that has complete visibility into whatever 
has been indexed so far. This is primarily needed for tests.
 # Solr query results has information about the freshness of index used to 
serve the query.
 # Scalability: We would like to eventually support high number (~10K) of 
shared store based collections per Solr cluster. With understanding that a 
large majority (~80%) will not be used most of the time. The ones that will be 
used will also go through some daily usage cycle (less usage at night time). In 
other words, we do not want to proactively refresh SHARED replica from shared 
store until unless it is needed for querying or indexing. 

For indexing we already have things covered where we synchronously ensures that 
the leader replica is up to date with shared store before proceeding with new 
indexing. But we need a solution for queries.

*Proposal*

Two duration configuration values called *freshnessTolerance* and 
*refreshInterval* are introduced. Both are defined in units of seconds. Each 
SHARED replica tracks (in JVM memory) the last refresh time (LRT) it was 
brought to be up to date with the index data in the shared store (last time it 
read metadataSuffix^[1]^ from Zookeeper to evaluate its freshness, followed by 
a pull from shared store when needed). The time immediately before reading the 
metadataSuffix becomes the LRT. If there is nothing to pull then we simply 
update replica’s LRT. But if there is something to pull then LRT is updated 
only after the pull is successfully finished. Leader will also update its LRT 
after successful indexing batch. In successful indexing case the time just 
before the update of metadataSuffix in ZK becomes the LRT.

A replica with LRT less than equal to *freshnessTolerance* duration ago (when 
compared to query arrival time) is allowed to serve a query. But if the LRT is 
more than freshnessTolerance duration ago then we will check if there is any 
new indexing to pull or not. If there is none we will update the LRT of the 
replica and serve the query. If there is new indexing to pull then we will 
enqueue a pull from shared store and fail the query immediately by throwing 
"*Replica is not fresh enough to serve the query*" error.

SolrCloud already has a logic where if a replica is failed to serve the query 
then it will try the other available replicas of the shard. If none of the 
replicas of a shard are fresh enough to serve the query then Solr will return 
*NoReplicaIsFreshEnough* error.

The *refreshInterval* config denotes a shorter duration than freshnessTolerance 
config. A replica getting a continuous stream of queries will at least refresh 
itself every refreshInterval duration. 

Clients will have an option to *override freshnessTolerance on per query basis* 
i.e. it can be passed as a parameter to query. For immediate readability (serve 
the query with whatever has been indexed so far) client will pass 0 for 
freshnessTolerance.

If a replica serves the query it will return the *timeSinceLastRefresh* 
duration of the replica (in milliseconds) in query response. If in query fanout 
there are multiple shards(replicas) involved then the timeSinceLastRefresh 
duration of the query will be maximum of timeSinceLastRefresh of all the 
replicas that contributed to the query. Because of clock skewness issues 
between client and Solr, timestamp of the last refresh does not carry much 
meaning therefore we are returning the length of time since the last refresh.

[1] *metadataSuffix*: One collection can be made up of multiple shards and each 
shard can have multiple replicas. For each shard only one replica will be 
acting as a leader and taking indexing and pushing to shared store. Failures 
can happen and we can have old leader and new leader working at the same time. 
To keep index correct we write a unique string in zookeeper at the end of each 
successful indexing batch.  We call that unique string metadataSuffix and it is 
a per-shard metadata. It is not important to understand how it is used but that 
it exists and for each shard this is the source of truth for last successful 
indexing batch. Each replica has a JVM cache telling on what metadataSuffix 
version of the index it is at. 

*Pseudo code*

 
{code:java}
// A SolrQueryRequest q has arrived for replica r of shard s.

// Use of query arrival time is conservative. If we somehow has the ability
// to subtract network time from this, we will still be correct in our logic.
long queryArrivalTime = q.getStartTime();
long lastRefreshTime = getLastRefreshTime(r);
Long freshnessToleranceParameter = getFreshnessToleranceParameter(q);
long acceptableFreshnessTolerance = freshnessToleranceParameter != null ?
                    freshnessToleranceParameter : getFreshnessTolerance();
acceptableFreshnessTolerance = TimeUnit.MILLISECONDS.convert(
                    acceptableFreshnessTolerance, TimeUnit.SECONDS);
long minimumAcceptableLastRefreshTime = queryArrivalTime - 
                    acceptableFreshnessTolerance;
if (lastRefreshTime >= minimumAcceptableLastRefreshTime) {
  long refreshInterval = TimeUnit.MILLISECONDS.convert(getRefreshInterval(), 
                TimeUnit.SECONDS);
  if ((lastRefreshTime + refreshInterval) < System.currentTimeMillis()) {       
   enqueuePullFromSharedStore(r);
  }
} else {
  long newLastRefreshTime = System.currentTimeMillis();
  String metadataSuffixInCache = getMetadataSuffixFromCache(r);
  String metadataSuffixInZK = getMetadataSuffixFromZK(s);
  if (metadataSuffixInCache.equals(metadataSuffixInZK)) {
    // No new indexing since we last pulled from shared store.
    // we can just update our last refresh time and we will be fresh enough to
    // serve the query        
    updateLastRefreshTime(r, newLastRefreshTime);
  } else {
    enqueuePullFromSharedStore(r);
    throw new SolrException(ErrorCode.SERVER_ERROR, "Replica is not fresh   
                                   enough to serve the query");
  }
}

// serve the query...
{code}
 

 

*Minimizing NoReplicaIsFreshEnough errors*

Some steady state scenarios this proposal will cover without failing the query 
with NoReplicaIsFreshEnough error.
 * A replica getting continuous query stream will keep serving them.
 * Query stream is not continuous and there is no query activity for 
freshnessTolerance duration. 
 ** If there is no new indexing since the last query then any replica that has 
already been queried before will be able to server the query. 
 ** If there is new indexing then only the leader replica will be able to 
server the query (assuming leader has not failed after indexing)

We will still have the scenarios where we will run into NoReplicaIsFreshEnough 
error.
 * Cold start, shard was not active (no indexing, no querying).
 * Deployments doing a rolling restart of all the replicas.
 * There was no query activity for freshnessTolerance duration of time and then 
there was some indexing. After indexing leader died.

Minimizing NoReplicaIsFreshEnough errors is out of the scope for this Jira. 
There will be a separate Jira on that.


> SHARED replica's freshness
> --------------------------
>
>                 Key: SOLR-14339
>                 URL: https://issues.apache.org/jira/browse/SOLR-14339
>             Project: Solr
>          Issue Type: Sub-task
>          Components: SolrCloud, SolrJ
>            Reporter: Bilal Waheed
>            Priority: Major
>
> *Problem*
> The source of truth for a SHARED replica lives in the shared store. At any 
> give point in time, locally a SHARED replica can be missing some or all of 
> its shard's indexing. The amount of indexing that is missing from a SHARED 
> replica is arbitrary. This is even true for a leader replica because we do 
> not update SHARED replica from shared store at 
> startup/recovery/leader-election etc. All this means that a query served by 
> SHARED replica can be blind to arbitrary amount of indexing.
> There are few requirements we would like to support.
>  # Query visibility into whatever have been indexed so far is not stale 
> beyond some well defined SLA.
>  # A client can issue a Solr query that has complete visibility into whatever 
> has been indexed so far. This is primarily needed for tests.
>  # Solr query results has information about the freshness of index used to 
> serve the query.
>  # Scalability: We would like to eventually support high number (~10K) of 
> shared store based collections per Solr cluster. With understanding that a 
> large majority (~80%) will not be used most of the time. The ones that will 
> be used will also go through some daily usage cycle (less usage at night 
> time). In other words, we do not want to proactively refresh SHARED replica 
> from shared store until unless it is needed for querying or indexing. 
> For indexing we already have things covered where we synchronously ensures 
> that the leader replica is up to date with shared store before proceeding 
> with new indexing. But we need a solution for queries.
> *Proposal*
> Two duration configuration values called *freshnessTolerance* and 
> *refreshInterval* are introduced. Both are defined in units of seconds. Each 
> SHARED replica tracks (in JVM memory) the last refresh time (LRT) it was 
> brought to be up to date with the index data in the shared store (last time 
> it read metadataSuffix^[1]^ from Zookeeper to evaluate its freshness, 
> followed by a pull from shared store when needed). The time immediately 
> before reading the metadataSuffix becomes the LRT. If there is nothing to 
> pull then we simply update replica’s LRT. But if there is something to pull 
> then LRT is updated only after the pull is successfully finished. Leader will 
> also update its LRT after successful indexing batch. In successful indexing 
> case the time just before the update of metadataSuffix in ZK becomes the LRT.
> A replica with LRT less than equal to *freshnessTolerance* duration ago (when 
> compared to query arrival time) is allowed to serve a query. But if the LRT 
> is more than freshnessTolerance duration ago then we will check if there is 
> any new indexing to pull or not. If there is none we will update the LRT of 
> the replica and serve the query. If there is new indexing to pull then we 
> will enqueue a pull from shared store and fail the query immediately by 
> throwing "*Replica is not fresh enough to serve the query*" error.
> SolrCloud already has a logic where if a replica is failed to serve the query 
> then it will try the other available replicas of the shard. If none of the 
> replicas of a shard are fresh enough to serve the query then Solr will return 
> *NoReplicaIsFreshEnough* error.
> The *refreshInterval* config denotes a shorter duration than 
> freshnessTolerance config. A replica getting a continuous stream of queries 
> will at least refresh itself every refreshInterval duration. 
> Clients will have an option to *override freshnessTolerance on per query 
> basis* i.e. it can be passed as a parameter to query. For immediate 
> readability (serve the query with whatever has been indexed so far) client 
> will pass 0 for freshnessTolerance.
> If a replica serves the query it will return the *timeSinceLastRefresh* 
> duration of the replica (in milliseconds) in query response. If in query 
> fanout there are multiple shards(replicas) involved then the 
> timeSinceLastRefresh duration of the query will be maximum of 
> timeSinceLastRefresh of all the replicas that contributed to the query. 
> Because of clock skewness issues between client and Solr, timestamp of the 
> last refresh does not carry much meaning therefore we are returning the 
> length of time since the last refresh.
> [1] *metadataSuffix*: One collection can be made up of multiple shards and 
> each shard can have multiple replicas. For each shard only one replica will 
> be acting as a leader and taking indexing and pushing to shared store. 
> Failures can happen and we can have old leader and new leader working at the 
> same time. To keep index correct we write a unique string in zookeeper at the 
> end of each successful indexing batch.  We call that unique string 
> metadataSuffix and it is a per-shard metadata. It is not important to 
> understand how it is used but that it exists and for each shard this is the 
> source of truth for last successful indexing batch. Each replica has a JVM 
> cache telling on what metadataSuffix version of the index it is at.
> *Pseudo code*
>  
> {code:java}
> // A SolrQueryRequest q has arrived for replica r of shard s.
> // Use of query arrival time is conservative. If we somehow has the ability
> // to subtract network time from this, we will still be correct in our logic.
> long queryArrivalTime = q.getStartTime();
> long lastRefreshTime = getLastRefreshTime(r);
> Long freshnessToleranceParameter = getFreshnessToleranceParameter(q);
> long acceptableFreshnessTolerance = freshnessToleranceParameter != null ?
>                     freshnessToleranceParameter : getFreshnessTolerance();
> acceptableFreshnessTolerance = TimeUnit.MILLISECONDS.convert(
>                     acceptableFreshnessTolerance, TimeUnit.SECONDS);
> long minimumAcceptableLastRefreshTime = queryArrivalTime - 
>                     acceptableFreshnessTolerance;
> if (lastRefreshTime >= minimumAcceptableLastRefreshTime) {
>   long refreshInterval = TimeUnit.MILLISECONDS.convert(getRefreshInterval(), 
>                 TimeUnit.SECONDS);
>   if ((lastRefreshTime + refreshInterval) < System.currentTimeMillis()) { 
>     enqueuePullFromSharedStore(r);
>   }
> } else {
>   String metadataSuffixInCache = getMetadataSuffixFromCache(r);
>   long newLastRefreshTime = System.currentTimeMillis();
>   String metadataSuffixInZK = getMetadataSuffixFromZK(s);
>   if (metadataSuffixInCache.equals(metadataSuffixInZK)) {
>     // No new indexing since the last refresh.
>     // we can just update our last refresh time and we will be fresh enough to
>     // serve the query        
>     updateLastRefreshTime(r, newLastRefreshTime);
>   } else {
>     enqueuePullFromSharedStore(r);
>     throw new SolrException(ErrorCode.SERVER_ERROR, "Replica is not fresh   
>                                    enough to serve the query");
>   }
> }
> // serve the query...
> {code}
>  
>  
> *Minimizing NoReplicaIsFreshEnough errors*
> Some steady state scenarios this proposal will cover without failing the 
> query with NoReplicaIsFreshEnough error.
>  * A replica getting continuous query stream will keep serving them.
>  * Query stream is not continuous and there is no query activity for 
> freshnessTolerance duration.
>  ** If there is no new indexing since the last query then any replica that 
> has already been queried before will be able to server the query.
>  ** If there is new indexing then only the leader replica will be able to 
> server the query (assuming leader has not failed after indexing)
> We will still have the scenarios where we will run into 
> NoReplicaIsFreshEnough error.
>  * Cold start, shard was not active (no indexing, no querying).
>  * Deployments doing a rolling restart of all the replicas.
>  * There was no query activity for freshnessTolerance duration of time and 
> then there was some indexing. After indexing leader died.
> Minimizing NoReplicaIsFreshEnough errors is out of the scope for this Jira. 
> There will be a separate Jira on that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to