There have been several proposals for a Lucene-based distributed index architecture. 1) Doug Cutting's "Index Server Project Proposal" at http://www.mail-archive.com/[EMAIL PROTECTED]/msg00338.html 2) Solr's "Distributed Search" at http://wiki.apache.org/solr/DistributedSearch 3) Mark Butler's "Distributed Lucene" at http://wiki.apache.org/hadoop/DistributedLucene
We have also been working on a Lucene-based distributed index architecture. Our design differs from the above proposals in the way it leverages Hadoop as much as possible. In particular, HDFS is used to reliably store Lucene instances, Map/Reduce is used to analyze documents and update Lucene instances in parallel, and Hadoop's IPC framework is used. Our design is geared for applications that require a highly scalable index and where batch updates to each Lucene instance are acceptable (verses finer-grained document at a time updates). We have a working implementation of our design and are in the process of evaluating its performance. An overview of our design is provided below. We welcome feedback and would like to know if you are interested in working on it. If so, we would be happy to make the code publicly available. At the same time, we would like to collaborate with people working on existing proposals and see if we can consolidate our efforts. TERMINOLOGY A distributed "index" is partitioned into "shards". Each shard corresponds to a Lucene instance and contains a disjoint subset of the documents in the index. Each shard is stored in HDFS and served by one or more "shard servers". Here we only talk about a single distributed index, but in practice multiple indexes can be supported. A "master" keeps track of the shard servers and the shards being served by them. An "application" updates and queries the global index through an "index client". An index client communicates with the shard servers to execute a query. KEY RPC METHODS This section lists the key RPC methods in our design. To simplify the discussion, some of their parameters have been omitted. On the Shard Servers // Execute a query on this shard server's Lucene instance. // This method is called by an index client. SearchResults search(Query query); On the Master // Tell the master to update the shards, i.e., Lucene instances. // This method is called by an index client. boolean updateShards(Configuration conf); // Ask the master where the shards are located. // This method is called by an index client. LocatedShards getShardLocations(); // Send a heartbeat to the master. This method is called by a // shard server. In the response, the master informs the // shard server when to switch to a newer version of the index. ShardServerCommand sendHeartbeat(); QUERYING THE INDEX To query the index, an application sends a search request to an index client. The index client then calls the shard server search() method for each shard of the index, merges the results and returns them to the application. The index client caches the mapping between shards and shard servers by periodically calling the master's getShardLocations() method. UPDATING THE INDEX USING MAP/REDUCE To update the index, an application sends an update request to an index client. The index client then calls the master's updateShards() method, which schedules a Map/Reduce job to update the index. The Map/Reduce job updates the shards in parallel and copies the new index files of each shard (i.e., Lucene instance) to HDFS. The updateShards() method includes a "configuration", which provides information for updating the shards. More specifically, the configuration includes the following information: - Input path. This provides the location of updated documents, e.g., HDFS files or directories, or HBase tables. - Input formatter. This specifies how to format the input documents. - Analysis. This defines the analyzer to use on the input. The analyzer determines whether a document is being inserted, updated, or deleted. For inserts or updates, the analyzer also converts each input document into a Lucene document. The Map phase of the Map/Reduce job formats and analyzes the input (in parallel), while the Reduce phase collects and applies the updates to each Lucene instance (again in parallel). The updates are applied using the local file system where a Reduce task runs and then copied back to HDFS. For example, if the updates caused a new Lucene segment to be created, the new segment would be created on the local file system first, and then copied back to HDFS. When the Map/Reduce job completes, a "new version" of the index is ready to be queried. It is important to note that the new version of the index is not derived from scratch. By leveraging Lucene's update algorithm, the new version of each Lucene instance will share as many files as possible as the previous version. ENSURING INDEX CONSISTENCY At any point in time, an index client always has a consistent view of the shards in the index. The results of a search query include either all or none of a recent update to the index. The details of the algorithm to accomplish this are omitted here, but the basic flow is pretty simple. After the Map/Reduce job to update the shards completes, the master will tell each shard server to "prepare" the new version of the index. After all the shard servers have responded affirmatively to the "prepare" message, the new index is ready to be queried. An index client will then lazily learn about the new index when it makes its next getShardLocations() call to the master. In essence, a lazy two-phase commit protocol is used, with "prepare" and "commit" messages piggybacked on heartbeats. After a shard has switched to the new index, the Lucene files in the old index that are no longer needed can safely be deleted. ACHIEVING FAULT-TOLERANCE We rely on the fault-tolerance of Map/Reduce to guarantee that an index update will eventually succeed. All shards are stored in HDFS and can be read by any shard server in a cluster. For a given shard, if one of its shard servers dies, new search requests are handled by its surviving shard servers. To ensure that there is always enough coverage for a shard, the master will instruct other shard servers to take over the shards of a dead shard server. PERFORMANCE ISSUES Currently, each shard server reads a shard directly from HDFS. Experiments have shown that this approach does not perform very well, with HDFS causing Lucene to slow down fairly dramatically (by well over 5x when data blocks are accessed over the network). Consequently, we are exploring different ways to leverage the fault tolerance of HDFS and, at the same time, work around its performance problems. One simple alternative is to add a local file system cache on each shard server. Another alternative is to modify HDFS so that an application has more control over where to store the primary and replicas of an HDFS block. This feature may be useful for other HDFS applications (e.g., HBase). We would like to collaborate with other people who are interested in adding this feature to HDFS. Regards, Ning Li