Re: NRT segment replication in AWS

2025-02-26 Thread Steven Schlansker



> On Feb 26, 2025, at 2:53 PM, Marc Davenport 
>  wrote:
> 
> Hello,
> Our current search solution is a pretty big monolith running on pretty
> beefy EC2 instances.  Every node is responsible for indexing and serving
> queries.  We want to start decomposing our service and are starting with
> separating the indexing and query handling responsibilities.

We run a probably comparatively small but otherwise similar installation, using
Google Kubernetes instances. We just use a persistent disk instead of an 
elastic store, but
also would consider using something like S3 in the future.

> I'm in the research phases now trying to collect any prior art I can. The
> rough sketch is to implement the NRT two replication node classes on their
> respective services and use S3 as a distribution point for the segment
> files.  I'm still debating if there should be some direct knowledge of the
> replicas in the primary node.

We tried to avoid the primary keeping any durable state about replicas,
as replicas tend to disappear for any or no reason in a cloud environment.
A specific example issue we ran into: we disable the 'waitForAllRemotesToClose' 
step entirely
https://github.com/apache/lucene/pull/11822

> Or if the primary node can just churn away
> creating base indexes and updates and publish to a queue when it produces a
> new set of segments. Then the replicas are then free to pick up the latest
> index as they spin up and subscribe to changes for it. It seems like having
> the indexer being responsible for also communicating with the replicas
> would be double duty for that system.  I'd love to hear other experiences
> if people can share them or point to writings about them they read when
> designing their systems.

In our case, the primary keeps the latest CopyState that any replica should 
take in memory.
Replicas call a HTTP api in an infinite loop, passing in their current version, 
and asking if any newer version is available.
If the replica is behind, the primary gives it the current CopyState NRT point 
immediately.
If the replica is caught up, we hold the request in a long-poll for just short 
of our http timeout, waiting for
a new version to become available, otherwise return "no update for now, try 
again".

Once the replica receives an updated CopyState, it feeds it into the 
ReplicaNode with newNRTPoint which starts the file copy

There's a bit of magic we devised internally around retaining the CopyState 
reference count, as this is stateful and the HTTP replication is mostly 
stateless.
We keep this simple by having only a single primary node at any given time.

Your idea of using a queue instead is interesting but not something we 
extensively looked at :)

> I've looked at nrtsearch from yelp and they seem to let the primary node
> have direct knowledge of the replicas.  That makes sense since it is based
> on McCandless's LuceneServer.
> 
> I know that Amazon internally uses Lucene and has indexing separated from
> query nodes and that they re-index and publish completely new indexes with
> every release to prod. I've been watching what I can of the great videos of
> Sokolov, McCandless, & Froh etc. But they don't show much behind the
> curtain (understandably) about the details of keeping things in sync.   If
> someone does know of a publicly available video or resource that describes
> this, I'd love to see it.

Unfortunately our journey with the LuceneServer was basically informed by the 
code and blog posts you've likely already seen -
and a bit of help from this mailing list - otherwise, there's not a ton of 
information out there or evidence of widespread use.
That said, we've been very happy with our setup, despite needing to put in a 
fair amount of elbow grease and low level details to get things working for us.


-
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org



Re: NRT segment replication in AWS

2025-02-26 Thread Michael Froh
Hi there,

I'm happy to share some details about how Amazon Product Search does its
segment replication. I haven't worked on Product Search in over three
years, so anything that I remember is not particularly novel. Also, it's
not really secret sauce -- I would have happily talked about it more in the
2021 re:Invent talk that Mike Sokolov and I did, but we were trying to keep
within our time limit. :)

That model doesn't exactly have direct communication between primary and
replica (which is generally a good practice in a cloud-based solution --
the fewer node-to-node dependencies, the better). The flow (if I recall
correctly) is driven by a couple of side-car components for the writers and
searchers and is roughly like this:

1. At a specified (pretty coarse) interval, the writer side-car calls a
"create checkpoint" API on the writer to ask it to write a checkpoint.
2. The writer uploads new segment files to S3, and a metadata object
describing the checkpoint contents (which probably includes segments from
an earlier checkpoint, since they can be reused).
3. The writer returns the S3 URL for the metadata object to its side-car.
4. The write side-car publishes the metadata URL to "something" -- see
below for details.
5. The searcher side-cars all read the metadata URL from "something" -- see
below for details.
6. The searcher side-cars each call a "use checkpoint" API on their local
searchers
7. The searchers each download the new segment files from S3 and open new
IndexSearchers.

For the details of steps 4 and 5, I don't actually remember how it worked,
but I have two pretty good guesses from what I remember of the overall
architecture:

1. DynamoDB: This is the more likely mechanism. Each index shard has a
unique ID which serves as a partition key in DynamoDB and there's a
sequence number as a sort key. The writer side-car inserts a DynamoDB
record with the next sequence number and the metadata URL. The searcher
side-car periodically fetches 1 record with the partition key by descending
sequence number (i.e. get latest sequence entry for the partition key). If
the sequence number has increased, then call the searcher's use-checkpoint
API.
2. Kinesis: This feels like the less likely mechanism, but I guess it could
work. The writer side-car writes the metadata URL to a Kinesis stream. Each
searcher side-car reads from the Kinesis stream and passes the metadata URL
to the searcher. I'm pretty sure we didn't have one Kinesis stream per
index shard, because managing (and paying for) that many Kinesis streams
would be a pain. Even with a sharded Kinesis stream, you'd "leak" some
checkpoints across index shards, leading to data that the searcher
side-cars would throw away. Also, each Kinesis stream shard has a limited
number of concurrent consumers, which would mean that the number of search
replicas would be limited. I'm *pretty sure* we used the DynamoDB approach.

Another Lucene-based search system that I worked on many years ago at
Amazon had a much simpler architecture:

1. Writer periodically writes new segments to S3.
2. After writing the new segments, the writer writes a metadata object to
S3 with a path like "/metadata.json". Because the writer was guaranteed to be the *only*
thing writing with prefixes of , it could manage its own dense
sequence numbers.
3. A searcher is "sticky" to a writer, and periodically issues an S3
GetObject for the next metadata object's full URL (i.e. the URL using the
next dense sequence number). Until the next checkpoint is written, it gets
a 404 response.
4. Searcher fetches the files referenced by the metadata file.

A nice thing about that approach was that it only depended on S3 and only
used PutObject and GetObject APIs, which tend to be more consistent. The
downside was that we needed a separate mechanism for writer discovery and
failover, to let searchers know the correct writer prefix.

Hope that helps! Let me know if you need any other suggestions.

Thanks,
Froh

On Wed, Feb 26, 2025 at 3:31 PM Steven Schlansker <
stevenschlans...@gmail.com> wrote:

>
>
> > On Feb 26, 2025, at 2:53 PM, Marc Davenport 
> > 
> wrote:
> >
> > Hello,
> > Our current search solution is a pretty big monolith running on pretty
> > beefy EC2 instances.  Every node is responsible for indexing and serving
> > queries.  We want to start decomposing our service and are starting with
> > separating the indexing and query handling responsibilities.
>
> We run a probably comparatively small but otherwise similar installation,
> using
> Google Kubernetes instances. We just use a persistent disk instead of an
> elastic store, but
> also would consider using something like S3 in the future.
>
> > I'm in the research phases now trying to collect any prior art I can. The
> > rough sketch is to implement the NRT two replication node classes on
> their
> > respective services and use S3 as a distribution point for the segment
> > files.  I'm still debating if there should be some direct knowledge o

Re: NRT segment replication in AWS

2025-02-26 Thread Sarthak Nandi
 >  I'm still debating if there should be some direct knowledge of the
> replicas in the primary node. Or if the primary node can just churn away
> creating base indexes and updates and publish to a queue when it produces
a
> new set of segments. Then the replicas are then free to pick up the latest
> index as they spin up and subscribe to changes for it.

In nrtsearch, the primary uses replica information to:
1. publish new NRT point
2. pre-copy merged segments

It should be possible to let replicas know of new NRT points using an
external queue.
Pre-copying merged segments can also be done with a queue, but the replicas
would need some way to let the primary know that they are done copying the
merged files.
If the primary doesn't have knowledge of replicas, some third service would
have to keep track of the replicas and let the primary know once all
replicas are done pre-copying the segments.
Alternatively, you can just skip pre-copying.

On Wed, Feb 26, 2025 at 4:31 PM Michael Froh  wrote:

> Hi there,
>
> I'm happy to share some details about how Amazon Product Search does its
> segment replication. I haven't worked on Product Search in over three
> years, so anything that I remember is not particularly novel. Also, it's
> not really secret sauce -- I would have happily talked about it more in the
> 2021 re:Invent talk that Mike Sokolov and I did, but we were trying to keep
> within our time limit. :)
>
> That model doesn't exactly have direct communication between primary and
> replica (which is generally a good practice in a cloud-based solution --
> the fewer node-to-node dependencies, the better). The flow (if I recall
> correctly) is driven by a couple of side-car components for the writers and
> searchers and is roughly like this:
>
> 1. At a specified (pretty coarse) interval, the writer side-car calls a
> "create checkpoint" API on the writer to ask it to write a checkpoint.
> 2. The writer uploads new segment files to S3, and a metadata object
> describing the checkpoint contents (which probably includes segments from
> an earlier checkpoint, since they can be reused).
> 3. The writer returns the S3 URL for the metadata object to its side-car.
> 4. The write side-car publishes the metadata URL to "something" -- see
> below for details.
> 5. The searcher side-cars all read the metadata URL from "something" -- see
> below for details.
> 6. The searcher side-cars each call a "use checkpoint" API on their local
> searchers
> 7. The searchers each download the new segment files from S3 and open new
> IndexSearchers.
>
> For the details of steps 4 and 5, I don't actually remember how it worked,
> but I have two pretty good guesses from what I remember of the overall
> architecture:
>
> 1. DynamoDB: This is the more likely mechanism. Each index shard has a
> unique ID which serves as a partition key in DynamoDB and there's a
> sequence number as a sort key. The writer side-car inserts a DynamoDB
> record with the next sequence number and the metadata URL. The searcher
> side-car periodically fetches 1 record with the partition key by descending
> sequence number (i.e. get latest sequence entry for the partition key). If
> the sequence number has increased, then call the searcher's use-checkpoint
> API.
> 2. Kinesis: This feels like the less likely mechanism, but I guess it could
> work. The writer side-car writes the metadata URL to a Kinesis stream. Each
> searcher side-car reads from the Kinesis stream and passes the metadata URL
> to the searcher. I'm pretty sure we didn't have one Kinesis stream per
> index shard, because managing (and paying for) that many Kinesis streams
> would be a pain. Even with a sharded Kinesis stream, you'd "leak" some
> checkpoints across index shards, leading to data that the searcher
> side-cars would throw away. Also, each Kinesis stream shard has a limited
> number of concurrent consumers, which would mean that the number of search
> replicas would be limited. I'm *pretty sure* we used the DynamoDB approach.
>
> Another Lucene-based search system that I worked on many years ago at
> Amazon had a much simpler architecture:
>
> 1. Writer periodically writes new segments to S3.
> 2. After writing the new segments, the writer writes a metadata object to
> S3 with a path like " number>/metadata.json". Because the writer was guaranteed to be the *only*
> thing writing with prefixes of , it could manage its own dense
> sequence numbers.
> 3. A searcher is "sticky" to a writer, and periodically issues an S3
> GetObject for the next metadata object's full URL (i.e. the URL using the
> next dense sequence number). Until the next checkpoint is written, it gets
> a 404 response.
> 4. Searcher fetches the files referenced by the metadata file.
>
> A nice thing about that approach was that it only depended on S3 and only
> used PutObject and GetObject APIs, which tend to be more consistent. The
> downside was that we needed a separate mechanism for writer discovery and
> fail

NRT segment replication in AWS

2025-02-26 Thread Marc Davenport
Hello,
Our current search solution is a pretty big monolith running on pretty
beefy EC2 instances.  Every node is responsible for indexing and serving
queries.  We want to start decomposing our service and are starting with
separating the indexing and query handling responsibilities.

I'm in the research phases now trying to collect any prior art I can. The
rough sketch is to implement the NRT two replication node classes on their
respective services and use S3 as a distribution point for the segment
files.  I'm still debating if there should be some direct knowledge of the
replicas in the primary node. Or if the primary node can just churn away
creating base indexes and updates and publish to a queue when it produces a
new set of segments. Then the replicas are then free to pick up the latest
index as they spin up and subscribe to changes for it. It seems like having
the indexer being responsible for also communicating with the replicas
would be double duty for that system.  I'd love to hear other experiences
if people can share them or point to writings about them they read when
designing their systems.

I've looked at nrtsearch from yelp and they seem to let the primary node
have direct knowledge of the replicas.  That makes sense since it is based
on McCandless's LuceneServer.

I know that Amazon internally uses Lucene and has indexing separated from
query nodes and that they re-index and publish completely new indexes with
every release to prod. I've been watching what I can of the great videos of
Sokolov, McCandless, & Froh etc. But they don't show much behind the
curtain (understandably) about the details of keeping things in sync.   If
someone does know of a publicly available video or resource that describes
this, I'd love to see it.


Thank you and looking forward to whatever resources or thoughts you have,
Marc