Ning,

I am also interested in starting a new project in this area. The approach I have in mind is slightly different, but hopefully we can come to some agreement and collaborate.

My current thinking is that the Solr search API is the appropriate model. Solr's facets are an important feature that require low-level support to be practical. Thus a useful distributed search system should support facets from the outset, rather than attempt to graft them on later. In particular, I believe this requirement mandates disjoint shards.

My primary difference with your proposal is that I would like to support online indexing. Documents could be inserted and removed directly, and shards would synchronize changes amongst replicas, with an "eventual consistency" model. Indexes would not be stored in HDFS, but directly on the local disk of each node. Hadoop would perhaps not play a role. In many ways this would resemble CouchDB, but with explicit support for sharding and failover from the outset.

A particular client should be able to provide a consistent read/write view by bonding to particular replicas of a shard. Thus a user who makes a modification should be able to generally see that modification in results immediately, while other users, talking to different replicas, may not see it until synchronization is complete.

There are many unresolved issues in my mind around sharding and replication that I hope to reach some clarity on before beginning implementation. Does this sound like it could be of interest to you?

Cheers,

Doug

Ning Li wrote:
There have been several proposals for a Lucene-based distributed index
architecture.
 1) Doug Cutting's "Index Server Project Proposal" at
    http://www.mail-archive.com/general@lucene.apache.org/msg00338.html
 2) Solr's "Distributed Search" at
    http://wiki.apache.org/solr/DistributedSearch
 3) Mark Butler's "Distributed Lucene" at
    http://wiki.apache.org/hadoop/DistributedLucene

We have also been working on a Lucene-based distributed index architecture.
Our design differs from the above proposals in the way it leverages Hadoop
as much as possible. In particular, HDFS is used to reliably store Lucene
instances, Map/Reduce is used to analyze documents and update Lucene
instances
in parallel, and Hadoop's IPC framework is used. Our design is geared for
applications that require a highly scalable index and where batch updates
to each Lucene instance are acceptable (verses finer-grained document at
a time updates).

We have a working implementation of our design and are in the process
of evaluating its performance. An overview of our design is provided below.
We welcome feedback and would like to know if you are interested in working
on it. If so, we would be happy to make the code publicly available. At the
same time, we would like to collaborate with people working on existing
proposals and see if we can consolidate our efforts.

TERMINOLOGY
A distributed "index" is partitioned into "shards". Each shard corresponds
to
a Lucene instance and contains a disjoint subset of the documents in the
index.
Each shard is stored in HDFS and served by one or more "shard servers". Here
we only talk about a single distributed index, but in practice multiple
indexes
can be supported.

A "master" keeps track of the shard servers and the shards being served by
them. An "application" updates and queries the global index through an
"index client". An index client communicates with the shard servers to
execute a query.

KEY RPC METHODS
This section lists the key RPC methods in our design. To simplify the
discussion, some of their parameters have been omitted.

  On the Shard Servers
    // Execute a query on this shard server's Lucene instance.
    // This method is called by an index client.
    SearchResults search(Query query);

  On the Master
    // Tell the master to update the shards, i.e., Lucene instances.
    // This method is called by an index client.
    boolean updateShards(Configuration conf);

    // Ask the master where the shards are located.
    // This method is called by an index client.
    LocatedShards getShardLocations();

    // Send a heartbeat to the master. This method is called by a
    // shard server. In the response, the master informs the
    // shard server when to switch to a newer version of the index.
    ShardServerCommand sendHeartbeat();

QUERYING THE INDEX
To query the index, an application sends a search request to an index
client.
The index client then calls the shard server search() method for each shard
of the index, merges the results and returns them to the application. The
index client caches the mapping between shards and shard servers by
periodically calling the master's getShardLocations() method.

UPDATING THE INDEX USING MAP/REDUCE
To update the index, an application sends an update request to an index
client.
The index client then calls the master's updateShards() method, which
schedules
a Map/Reduce job to update the index. The Map/Reduce job updates the shards
in
parallel and copies the new index files of each shard (i.e., Lucene
instance)
to HDFS.

The updateShards() method includes a "configuration", which provides
information for updating the shards. More specifically, the configuration
includes the following information:
  - Input path. This provides the location of updated documents, e.g., HDFS
    files or directories, or HBase tables.
  - Input formatter. This specifies how to format the input documents.
  - Analysis. This defines the analyzer to use on the input. The analyzer
    determines whether a document is being inserted, updated, or deleted.
For
    inserts or updates, the analyzer also converts each input document into
    a Lucene document.

The Map phase of the Map/Reduce job formats and analyzes the input (in
parallel), while the Reduce phase collects and applies the updates to each
Lucene instance (again in parallel). The updates are applied using the local
file system where a Reduce task runs and then copied back to HDFS. For
example,
if the updates caused a new Lucene segment to be created, the new segment
would be created on the local file system first, and then copied back to
HDFS.

When the Map/Reduce job completes, a "new version" of the index is ready to
be
queried. It is important to note that the new version of the index is not
derived from scratch. By leveraging Lucene's update algorithm, the new
version
of each Lucene instance will share as many files as possible as the previous
version.

ENSURING INDEX CONSISTENCY
At any point in time, an index client always has a consistent view of the
shards in the index. The results of a search query include either all or
none
of a recent update to the index. The details of the algorithm to accomplish
this are omitted here, but the basic flow is pretty simple.

After the Map/Reduce job to update the shards completes, the master will
tell
each shard server to "prepare" the new version of the index. After all the
shard servers have responded affirmatively to the "prepare" message, the new

index is ready to be queried. An index client will then lazily learn about
the new index when it makes its next getShardLocations() call to the master.

In essence, a lazy two-phase commit protocol is used, with "prepare" and
"commit" messages piggybacked on heartbeats. After a shard has switched to
the new index, the Lucene files in the old index that are no longer needed
can safely be deleted.

ACHIEVING FAULT-TOLERANCE
We rely on the fault-tolerance of Map/Reduce to guarantee that an index
update
will eventually succeed. All shards are stored in HDFS and can be read by
any
shard server in a cluster. For a given shard, if one of its shard servers
dies,
new search requests are handled by its surviving shard servers. To ensure
that
there is always enough coverage for a shard, the master will instruct other
shard servers to take over the shards of a dead shard server.

PERFORMANCE ISSUES
Currently, each shard server reads a shard directly from HDFS. Experiments
have shown that this approach does not perform very well, with HDFS causing
Lucene to slow down fairly dramatically (by well over 5x when data blocks
are
accessed over the network). Consequently, we are exploring different ways to
leverage the fault tolerance of HDFS and, at the same time, work around its
performance problems. One simple alternative is to add a local file system
cache on each shard server. Another alternative is to modify HDFS so that an
application has more control over where to store the primary and replicas of
an HDFS block. This feature may be useful for other HDFS applications (e.g.,
HBase). We would like to collaborate with other people who are interested in
adding this feature to HDFS.


Regards,
Ning Li


Reply via email to