[ 
https://issues.apache.org/jira/browse/SOLR-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yonik Seeley updated SOLR-13431:
--------------------------------
    Description: 
h2. Background & problem statement:

With shared storage support, data durability is handled by the storage layer 
(e.g. S3 or HDFS) and replicas are not needed for durability. This changes the 
nature of how a single update (say adding a document) must be handled. The 
local transaction log does not help... a node can go down and never come back. 
The implication is that *a commit must be done for any updates to be considered 
durable.*

The problem is also more complex than just batching updates and adding a commit 
at the end of a batch. Consider indexing documents A,B,C,D followed by a commit:
 1) documents A,B sent to leader1 and indexed
 2) leader1 fails, leader2 is elected
 3) documents C,D sent to leader2 and indexed
 4) commit
 After this sequence of events, documents A,B are actually lost because a 
commit was not done on leader1 before it failed.

Adding a commit for every single update would fix the problem of data loss, but 
would obviously be too expensive (and each commit will be more expensive We can 
still do batches if we *disable transparent failover* for a batch.
 - all updates in a batch (for a specific shard) should be indexed on the *same 
leader*... any change in leadership should result in a failure at the low level 
instead of any transparent failover or forwarding.
 - in the event of a failure, *all updates since the last commit must be 
replayed* (we can't just retry the failure itself), or the failure will need to 
be bubbled up to a higher layer to retry from the beginning.

h2. Indexing scenario 1: CSV upload

If SolrCloud is loading a large CSV file, The receiving Solr node will forward 
updates to the correct leaders. This happens in the DistributedUpdateProcessor 
via SolrCmdDistributor, which ends up using a ConcurrentUpdateHttp2SolrClient 
subclass.

Fixing this scenario for shared storage in the simplest way would entail adding 
a commit to every update, which would be way to slow.

The forward-to-replica use case here is quite different than the 
forward-to-correct-leader (the latter has the current solr node acting much 
more like an external client.).  To simpliify development, we may want to 
separate these cases and continue using the existing code for 
forward-to-replica. 

h2. Indexing scenario 2: SolrJ bulk indexing

In this scenario, a client is trying to do a large amount of indexing and can 
use batches or streaming. For this scenario, we could just require that a 
commit be added for each batch and then fail a batch on any leader change. This 
is problematic for a couple of reasons:
 - larger batches add latency to build, hurting throughput
 - doesn't scale well - as a collection grows, the number of shards grow and 
the chance that any shard leader goes down (or the shard is split) goes up. 
Requiring that the entire batch (all shards) be replayed when this happens is 
wasteful and gets worse with collection growth.

h2. Proposed Solution: a SolrJ cloud aware streaming client
 - something like ConcurrentUpdateHttp2SolrClient that can stream and know 
about cloud layout
 - track when last commit happened for each shard leader
 - buffer updates per-shard since the last commit happened
 -- doesn't have to be exact... assume idempotent updates here, so overlap is 
fine
 -- buffering would also be triggered by the replica type of the collection (so 
this class could be used for both shared storage and normal NRT replicas) 
 - a parameter would be passed that would disallow any forwarding (since we're 
handling buffering/failover at this level)
 - on a failure because of a leader going down or loss of leadership, wait 
until a new leader has been elected and then replay updates since the last 
commit
 - insert commits where necessary to prevent buffers from growing too large
 -- inserted commits should be able to proceed in parallel... we shouldn't need 
to block and wait for a commit before resuming to send documents to that leader.
 -- it would be nice if there was a way we could get notified if a commit 
happened via some other mechanism (like an autoCommit being triggered)
  --- assuming we can't get this, perhaps we should pass a flag that disables 
triggering auto-commits for these batch updates?
 - handle splits (not only can a shard leader change, but a shard could 
split... buffered updates may need to be re-slotted)
 - need to handle a leader "bounce" like a change in leadership (assuming we're 
skipping using the transaction log)
 - multi-threaded - all updates to a leader regardless of thread are managed as 
a single update stream
 -- this perhaps provides a way to coalesce incremental/realtime updates
 - OPTIONAL: ability to have multiple channels to a single leader?
 -- we would need to avoid reordering updates to the same ID
 -- an alternative to attempting to create more parallelism-per-shard on the 
client side is to do it on the server side.

  was:
h2. Background & problem statement:

With shared storage support, data durability is handled by the storage layer 
(e.g. S3 or HDFS) and replicas are not needed for durability. This changes the 
nature of how a single update (say adding a document) must be handled. The 
local transaction log does not help... a node can go down and never come back. 
The implication is that *a commit must be done for any updates to be considered 
durable.*

The problem is also more complex than just batching updates and adding a commit 
at the end of a batch. Consider indexing documents A,B,C,D followed by a commit:
 1) documents A,B sent to leader1 and indexed
 2) leader1 fails, leader2 is elected
 3) documents C,D sent to leader2 and indexed
 4) commit
 After this sequence of events, documents A,B are actually lost because a 
commit was not done on leader1 before it failed.

Adding a commit for every single update would fix the problem of data loss, but 
would obviously be too expensive (and each commit will be more expensive We can 
still do batches if we *disable transparent failover* for a batch.
 - all updates in a batch (for a specific shard) should be indexed on the *same 
leader*... any change in leadership should result in a failure at the low level 
instead of any transparent failover or forwarding.
 - in the event of a failure, *all updates since the last commit must be 
replayed* (we can't just retry the failure itself), or the failure will need to 
be bubbled up to a higher layer to retry from the beginning.

h2. Indexing scenario 1: CSV upload

If SolrCloud is loading a large CSV file, The receiving Solr node will forward 
updates to the correct leaders. This happens in the DistributedUpdateProcessor 
via SolrCmdDistributor, which ends up using a ConcurrentUpdateHttp2SolrClient 
subclass.

The forward-to-replica use case here is quite different than the 
forward-to-correct-leader (the latter has the current solr node acting much 
more like an external client.).  To simpliify development, we may want to 
separate these cases and continue using the existing code for 
forward-to-replica. 

h2. Indexing scenario 2: SolrJ bulk indexing

In this scenario, a client is trying to do a large amount of indexing and can 
use batches or streaming. For this scenario, we could just require that a 
commit be added for each batch and then fail a batch on any leader change. This 
is problematic for a couple of reasons:
 - larger batches add latency to build, hurting throughput
 - doesn't scale well - as a collection grows, the number of shards grow and 
the chance that any shard leader goes down (or the shard is split) goes up. 
Requiring that the entire batch (all shards) be replayed when this happens is 
wasteful and gets worse with collection growth.

h2. Proposed Solution: a SolrJ cloud aware streaming client
 - something like ConcurrentUpdateHttp2SolrClient that can stream and know 
about cloud layout
 - track when last commit happened for each shard leader
 - buffer updates per-shard since the last commit happened
 -- doesn't have to be exact... assume idempotent updates here, so overlap is 
fine
 -- buffering would also be triggered by the replica type of the collection (so 
this class could be used for both shared storage and normal NRT replicas) 
 - a parameter would be passed that would disallow any forwarding (since we're 
handling buffering/failover at this level)
 - on a failure because of a leader going down or loss of leadership, wait 
until a new leader has been elected and then replay updates since the last 
commit
 - insert commits where necessary to prevent buffers from growing too large
 -- inserted commits should be able to proceed in parallel... we shouldn't need 
to block and wait for a commit before resuming to send documents to that leader.
 -- it would be nice if there was a way we could get notified if a commit 
happened via some other mechanism (like an autoCommit being triggered)
  --- assuming we can't get this, perhaps we should pass a flag that disables 
triggering auto-commits for these batch updates?
 - handle splits (not only can a shard leader change, but a shard could 
split... buffered updates may need to be re-slotted)
 - need to handle a leader "bounce" like a change in leadership (assuming we're 
skipping using the transaction log)
 - multi-threaded - all updates to a leader regardless of thread are managed as 
a single update stream
 -- this perhaps provides a way to coalesce incremental/realtime updates
 - OPTIONAL: ability to have multiple channels to a single leader?
 -- we would need to avoid reordering updates to the same ID
 -- an alternative to attempting to create more parallelism-per-shard on the 
client side is to do it on the server side.


> Efficient updates with shared storage
> -------------------------------------
>
>                 Key: SOLR-13431
>                 URL: https://issues.apache.org/jira/browse/SOLR-13431
>             Project: Solr
>          Issue Type: New Feature
>      Security Level: Public(Default Security Level. Issues are Public) 
>            Reporter: Yonik Seeley
>            Priority: Major
>
> h2. Background & problem statement:
> With shared storage support, data durability is handled by the storage layer 
> (e.g. S3 or HDFS) and replicas are not needed for durability. This changes 
> the nature of how a single update (say adding a document) must be handled. 
> The local transaction log does not help... a node can go down and never come 
> back. The implication is that *a commit must be done for any updates to be 
> considered durable.*
> The problem is also more complex than just batching updates and adding a 
> commit at the end of a batch. Consider indexing documents A,B,C,D followed by 
> a commit:
>  1) documents A,B sent to leader1 and indexed
>  2) leader1 fails, leader2 is elected
>  3) documents C,D sent to leader2 and indexed
>  4) commit
>  After this sequence of events, documents A,B are actually lost because a 
> commit was not done on leader1 before it failed.
> Adding a commit for every single update would fix the problem of data loss, 
> but would obviously be too expensive (and each commit will be more expensive 
> We can still do batches if we *disable transparent failover* for a batch.
>  - all updates in a batch (for a specific shard) should be indexed on the 
> *same leader*... any change in leadership should result in a failure at the 
> low level instead of any transparent failover or forwarding.
>  - in the event of a failure, *all updates since the last commit must be 
> replayed* (we can't just retry the failure itself), or the failure will need 
> to be bubbled up to a higher layer to retry from the beginning.
> h2. Indexing scenario 1: CSV upload
> If SolrCloud is loading a large CSV file, The receiving Solr node will 
> forward updates to the correct leaders. This happens in the 
> DistributedUpdateProcessor via SolrCmdDistributor, which ends up using a 
> ConcurrentUpdateHttp2SolrClient subclass.
> Fixing this scenario for shared storage in the simplest way would entail 
> adding a commit to every update, which would be way to slow.
> The forward-to-replica use case here is quite different than the 
> forward-to-correct-leader (the latter has the current solr node acting much 
> more like an external client.).  To simpliify development, we may want to 
> separate these cases and continue using the existing code for 
> forward-to-replica. 
> h2. Indexing scenario 2: SolrJ bulk indexing
> In this scenario, a client is trying to do a large amount of indexing and can 
> use batches or streaming. For this scenario, we could just require that a 
> commit be added for each batch and then fail a batch on any leader change. 
> This is problematic for a couple of reasons:
>  - larger batches add latency to build, hurting throughput
>  - doesn't scale well - as a collection grows, the number of shards grow and 
> the chance that any shard leader goes down (or the shard is split) goes up. 
> Requiring that the entire batch (all shards) be replayed when this happens is 
> wasteful and gets worse with collection growth.
> h2. Proposed Solution: a SolrJ cloud aware streaming client
>  - something like ConcurrentUpdateHttp2SolrClient that can stream and know 
> about cloud layout
>  - track when last commit happened for each shard leader
>  - buffer updates per-shard since the last commit happened
>  -- doesn't have to be exact... assume idempotent updates here, so overlap is 
> fine
>  -- buffering would also be triggered by the replica type of the collection 
> (so this class could be used for both shared storage and normal NRT replicas) 
>  - a parameter would be passed that would disallow any forwarding (since 
> we're handling buffering/failover at this level)
>  - on a failure because of a leader going down or loss of leadership, wait 
> until a new leader has been elected and then replay updates since the last 
> commit
>  - insert commits where necessary to prevent buffers from growing too large
>  -- inserted commits should be able to proceed in parallel... we shouldn't 
> need to block and wait for a commit before resuming to send documents to that 
> leader.
>  -- it would be nice if there was a way we could get notified if a commit 
> happened via some other mechanism (like an autoCommit being triggered)
>   --- assuming we can't get this, perhaps we should pass a flag that disables 
> triggering auto-commits for these batch updates?
>  - handle splits (not only can a shard leader change, but a shard could 
> split... buffered updates may need to be re-slotted)
>  - need to handle a leader "bounce" like a change in leadership (assuming 
> we're skipping using the transaction log)
>  - multi-threaded - all updates to a leader regardless of thread are managed 
> as a single update stream
>  -- this perhaps provides a way to coalesce incremental/realtime updates
>  - OPTIONAL: ability to have multiple channels to a single leader?
>  -- we would need to avoid reordering updates to the same ID
>  -- an alternative to attempting to create more parallelism-per-shard on the 
> client side is to do it on the server side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org

Reply via email to