I like this!  I also think this should be in the core.  This would
allow one to partition an index and spread it over multiple disks, for
instance.

I like the name ParallelReader, but I wonder if people will confuse it
with "Aha, an IndexReader that reads multiple indices in parallel
(using threads)", since what makes ParallelMultiSearcher parallel are
its threading abilities.  Do I have a better name?  Hm... not right
now. (MatchingReader?  TwinReader?  StripedReader?  Don't like any of
these too much...).

Otis



--- Doug Cutting <[EMAIL PROTECTED]> wrote:

> Please find attached something I wrote today.  It has not been yet 
> tested extensively, and the documentation could be improved, but I 
> thought it would be good to get comments sooner rather than later.
> 
> Would folks find this useful?
> 
> Should it go into the core or in contrib?
> 
> Doug
> > Index: src/java/org/apache/lucene/index/ParallelReader.java
> ===================================================================
> --- src/java/org/apache/lucene/index/ParallelReader.java      (revision 0)
> +++ src/java/org/apache/lucene/index/ParallelReader.java      (revision 0)
> @@ -0,0 +1,329 @@
> +package org.apache.lucene.index;
> +
> +/**
> + * Copyright 2004 The Apache Software Foundation
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions
> and
> + * limitations under the License.
> + */
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.document.Field;
> +import org.apache.lucene.store.Directory;
> +
> +import java.io.IOException;
> +import java.util.*;
> +
> +/** An IndexReader which reads multiple, parallel indexes.  Each
> index added
> + * must have the same number of documents, but typically each
> contains
> + * different fields.  Each document contains the union of the fields
> of all
> + * documents with the same document number.  When searching, matches
> for a
> + * query term are from the first index added that has the field.
> + *
> + * <p>This is useful, e.g., with collections that have large fields
> which
> + * change rarely and small fields that change more frequently.  The
> smaller
> + * fields may be re-indexed in a new index and both indexes may be
> searched
> + * together.
> + */
> +public class ParallelReader extends IndexReader {
> +  private ArrayList readers = new ArrayList();
> +  private SortedMap fieldToReader = new TreeMap();
> +  private ArrayList storedFieldReaders = new ArrayList(); 
> +
> +  private int maxDoc;
> +  private int numDocs;
> +  private boolean hasDeletions;
> +
> + /** Construct a ParallelReader. */
> +  public ParallelReader() throws IOException { super(null); }
> +    
> + /** Add an IndexReader. */
> +  public void add(IndexReader reader) throws IOException {
> +    add(reader, false);
> +  }
> +
> + /** Add an IndexReader whose stored fields will not be returned. 
> This can
> +  * accellerate search when stored fields are only needed from a
> subset of
> +  * the IndexReaders. */
> +  public void add(IndexReader reader, boolean ignoreStoredFields)
> +    throws IOException {
> +
> +    if (readers.size() == 0) {
> +      this.maxDoc = reader.maxDoc();
> +      this.numDocs = reader.numDocs();
> +      this.hasDeletions = reader.hasDeletions();
> +    }
> +
> +    if (reader.maxDoc() != maxDoc)                // check
> compatibility
> +      throw new IllegalArgumentException
> +        ("All readers must have same maxDoc:
> "+maxDoc+"!="+reader.maxDoc());
> +    if (reader.numDocs() != numDocs)
> +      throw new IllegalArgumentException
> +        ("All readers must have same numDocs:
> "+numDocs+"!="+reader.numDocs());
> +    
> +    Iterator i = reader.getFieldNames().iterator();
> +    while (i.hasNext()) {                         // update
> fieldToReader map
> +      String field = (String)i.next();
> +      if (fieldToReader.get(field) == null)
> +        fieldToReader.put(field, reader);
> +    }
> +
> +    if (!ignoreStoredFields)
> +      storedFieldReaders.add(reader);             // add to
> storedFieldReaders
> +
> +  }
> +
> +
> +  public int numDocs() { return numDocs; }
> +
> +  public int maxDoc() { return maxDoc; }
> +
> +  public boolean hasDeletions() { return hasDeletions; }
> +
> +  // check first reader
> +  public boolean isDeleted(int n) {
> +    if (readers.size() > 0)
> +      return ((IndexReader)readers.get(0)).isDeleted(n);
> +    return false;
> +  }
> +
> +  // delete in all readers
> +  protected void doDelete(int n) throws IOException {
> +    for (int i = 0; i < readers.size(); i++) {
> +      ((IndexReader)readers.get(i)).doDelete(n);
> +    }
> +    hasDeletions = true;
> +  }
> +
> +  // undeleteAll in all readers
> +  protected void doUndeleteAll() throws IOException {
> +    for (int i = 0; i < readers.size(); i++) {
> +      ((IndexReader)readers.get(i)).doUndeleteAll();
> +    }
> +    hasDeletions = false;
> +  }
> +
> +  // append fields from storedFieldReaders
> +  public Document document(int n) throws IOException {
> +    Document result = new Document();
> +    for (int i = 0; i < storedFieldReaders.size(); i++) {
> +      IndexReader reader = (IndexReader)storedFieldReaders.get(i);
> +      Enumeration fields = reader.document(n).fields();
> +      while (fields.hasMoreElements()) {
> +        result.add((Field)fields.nextElement());
> +      }
> +    }
> +    return result;
> +  }
> +
> +  // get all vectors
> +  public TermFreqVector[] getTermFreqVectors(int n) throws
> IOException {
> +    ArrayList results = new ArrayList();
> +    Iterator i = fieldToReader.entrySet().iterator();
> +    while (i.hasNext()) {
> +      Map.Entry e = (Map.Entry)i.next();
> +      IndexReader reader = (IndexReader)e.getKey();
> +      String field = (String)e.getValue();
> +      TermFreqVector vector = reader.getTermFreqVector(n, field);
> +      if (vector != null)
> +        results.add(vector);
> +    }
> +    return (TermFreqVector[])
> +      results.toArray(new TermFreqVector[results.size()]);
> +  }
> +
> +  public TermFreqVector getTermFreqVector(int n, String field)
> +    throws IOException {
> +    return
> ((IndexReader)fieldToReader.get(field)).getTermFreqVector(n, field);
> +  }
> +
> +  public byte[] norms(String field) throws IOException {
> +    return ((IndexReader)fieldToReader.get(field)).norms(field);
> +  }
> +
> +  public void norms(String field, byte[] result, int offset)
> +    throws IOException {
> +     ((IndexReader)fieldToReader.get(field)).norms(field, result,
> offset);
> +  }
> +
> +  protected void doSetNorm(int n, String field, byte value)
> +    throws IOException {
> +    ((IndexReader)fieldToReader.get(field)).doSetNorm(n, field,
> value);
> +  }
> +
> +  public TermEnum terms() throws IOException {
> +    return new ParallelTermEnum();
> +  }
> +
> +  public TermEnum terms(Term term) throws IOException {
> +    return new ParallelTermEnum(term);
> +  }
> +
> +  public int docFreq(Term term) throws IOException {
> +    return
> ((IndexReader)fieldToReader.get(term.field())).docFreq(term);
> +  }
> +
> +  public TermDocs termDocs(Term term) throws IOException {
> +    return new ParallelTermDocs(term);
> +  }
> +
> +  public TermDocs termDocs() throws IOException {
> +    return new ParallelTermDocs();
> +  }
> +
> +  public TermPositions termPositions(Term term) throws IOException {
> +    return new ParallelTermPositions(term);
> +  }
> +
> +  public TermPositions termPositions() throws IOException {
> +    return new ParallelTermPositions();
> +  }
> +
> +  protected void doCommit() throws IOException {
> +    for (int i = 0; i < readers.size(); i++)
> +      ((IndexReader)readers.get(i)).commit();
> +  }
> +
> +  protected synchronized void doClose() throws IOException {
> +    for (int i = 0; i < readers.size(); i++)
> +      ((IndexReader)readers.get(i)).close();
> +  }
> +
> +  public Collection getFieldNames() throws IOException {
> +    return fieldToReader.keySet();
> +  }
> +
> +  public Collection getFieldNames(boolean indexed) throws
> IOException {
> +    Set fieldSet = new HashSet();
> +    for (int i = 0; i < readers.size(); i++) {
> +      IndexReader reader = ((IndexReader)readers.get(i));
> +      Collection names = reader.getFieldNames(indexed);
> +      fieldSet.addAll(names);
> +    }
> +    return fieldSet;
> +  }
> +
> +  public Collection getIndexedFieldNames (Field.TermVector tvSpec){
> +    Set fieldSet = new HashSet();
> +    for (int i = 0; i < readers.size(); i++) {
> +      IndexReader reader = ((IndexReader)readers.get(i));
> +      Collection names = reader.getIndexedFieldNames(tvSpec);
> +      fieldSet.addAll(names);
> +    }
> +    return fieldSet;
> +  }
> +
> +  public Collection getFieldNames (IndexReader.FieldOption
> fieldNames) {
> +    Set fieldSet = new HashSet();
> +    for (int i = 0; i < readers.size(); i++) {
> +      IndexReader reader = ((IndexReader)readers.get(i));
> +      Collection names = reader.getFieldNames(fieldNames);
> +      fieldSet.addAll(names);
> +    }
> +    return fieldSet;
> +  }
> +
> +  private class ParallelTermEnum extends TermEnum {
> +    private String field;
> +    private TermEnum enum;
> +
> +    public ParallelTermEnum() throws IOException {
> +      field = (String)fieldToReader.firstKey();
> +      if (field != null)
> +        enum = ((IndexReader)fieldToReader.get(field)).terms();
> +    }
> +    
> +    public ParallelTermEnum(Term term) throws IOException {
> +      field = term.field();
> +      enum = ((IndexReader)fieldToReader.get(field)).terms(term);
> +    }
> +    
> +    public boolean next() throws IOException {
> +      if (field == null)
> +        return false;
> +
> +      boolean next = enum.next();
> +
> +      // still within field?
> +      if (next && enum.term().field() == field)
> +        return true;                              // yes, keep going
> +      
> +      enum.close();                               // close old enum
> +
> +      // find the next field, if any
> +      field = (String)fieldToReader.tailMap(field).firstKey();
> +      if (field != null) {
> +        enum = ((IndexReader)fieldToReader.get(field)).terms();
> +        return true;
> +      }
> +
> +      return false;                               // no more fields
> +        
> +    }
> +
> +    public Term term() { return enum.term(); }
> +    public int docFreq() { return enum.docFreq(); }
> +    public void close() throws IOException { enum.close(); }
> +
> +  }
> +
> +  // wrap a TermDocs in order to support seek(Term)
> +  private class ParallelTermDocs implements TermDocs {
> +    protected TermDocs termDocs;
> +
> +    public ParallelTermDocs() {}
> +    public ParallelTermDocs(Term term) throws IOException {
> seek(term); }
> +
> +    public int doc() { return termDocs.doc(); }
> +    public int freq() { return termDocs.freq(); }
> +
> +    public void seek(Term term) throws IOException {
> +      termDocs =
> ((IndexReader)fieldToReader.get(term.field())).termDocs(term);
> +    }
> +
> +    public void seek(TermEnum termEnum) throws IOException {
> +      seek(termEnum.term());
> +    }
> +
> +    public boolean next() throws IOException { return
> termDocs.next(); }
> +
> +    public int read(final int[] docs, final int[] freqs) throws
> IOException {
> +      return termDocs.read(docs, freqs);
> +    }
> +
> +    public boolean skipTo(int target) throws IOException {
> +      return termDocs.skipTo(target);
> +    }
> +
> +    public void close() throws IOException { termDocs.close(); }
> +
> +  }
> +
> +  private class ParallelTermPositions
> +    extends ParallelTermDocs implements TermPositions {
> +
> +    public ParallelTermPositions() {}
> +    public ParallelTermPositions(Term term) throws IOException {
> seek(term); }
> +
> +    public void seek(Term term) throws IOException {
> +      termDocs = ((IndexReader)fieldToReader.get(term.field()))
> +        .termPositions(term);
> +    }
> +
> +    public int nextPosition() throws IOException {
> +      return ((TermPositions)termDocs).nextPosition();
> +    }
> +
> +  }
> +
> +}
> +
> Index: src/test/org/apache/lucene/index/TestParallelReader.java
> ===================================================================
> --- src/test/org/apache/lucene/index/TestParallelReader.java
> (revision 0)
> +++ src/test/org/apache/lucene/index/TestParallelReader.java
> (revision 0)
> @@ -0,0 +1,128 @@
> +package org.apache.lucene.index;
> +
> +/**
> + * Copyright 2005 The Apache Software Foundation
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions
> and
> + * limitations under the License.
> + */
> +
> +import java.io.IOException;
> +
> +import junit.framework.TestCase;
> +
> +import org.apache.lucene.analysis.standard.StandardAnalyzer;
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.document.Field;
> +import org.apache.lucene.index.IndexWriter;
> +import org.apache.lucene.index.Term;
> +import org.apache.lucene.store.Directory;
> +import org.apache.lucene.store.RAMDirectory;
> +import org.apache.lucene.search.BooleanClause.Occur;
> +import org.apache.lucene.search.*;
> +
> +public class TestParallelReader extends TestCase {
> +
> +  private Searcher parallel;
> +  private Searcher single;
> +  
> +  protected void setUp() throws Exception {
> +    single = single();
> +    parallel = parallel();
> +  }
> +
> +  public void testQueries() throws Exception {
> +    queryTest(new TermQuery(new Term("f1", "v1")));
> +    queryTest(new TermQuery(new Term("f1", "v2")));
> +    queryTest(new TermQuery(new Term("f2", "v1")));
> +    queryTest(new TermQuery(new Term("f2", "v2")));
> +    queryTest(new TermQuery(new Term("f3", "v1")));
> +    queryTest(new TermQuery(new Term("f3", "v2")));
> +    queryTest(new TermQuery(new Term("f4", "v1")));
> +    queryTest(new TermQuery(new Term("f4", "v2")));
> +
> +    BooleanQuery bq1 = new BooleanQuery();
> +    bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST);
> +    bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST);
> +    queryTest(bq1);
> +
> +  }
> +
> +  private void queryTest(Query query) throws IOException {
> +    Hits parallelHits = parallel.search(query);
> +    Hits singleHits = single.search(query);
> +    assertEquals(parallelHits.length(), singleHits.length());
> +    for(int i = 0; i < parallelHits.length(); i++) {
> +      assertEquals(parallelHits.score(i), singleHits.score(i),
> 0.001f);
> +      Document docParallel = parallelHits.doc(i);
> +      Document docSingle = singleHits.doc(i);
> +      assertEquals(docParallel.get("f1"), docSingle.get("f1"));
> +      assertEquals(docParallel.get("f2"), docSingle.get("f2"));
> +      assertEquals(docParallel.get("f3"), docSingle.get("f3"));
> +      assertEquals(docParallel.get("f4"), docSingle.get("f4"));
> +    }
> +  }
> +
> +  // Fiels 1-4 indexed together:
> +  private Searcher single() throws IOException {
> +    Directory dir = new RAMDirectory();
> +    IndexWriter w = new IndexWriter(dir, new StandardAnalyzer(),
> true);
> +    Document d1 = new Document();
> +    d1.add(new Field("f1", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d1.add(new Field("f2", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d1.add(new Field("f3", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d1.add(new Field("f4", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    w.addDocument(d1);
> +    Document d2 = new Document();
> +    d2.add(new Field("f1", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d2.add(new Field("f2", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d2.add(new Field("f3", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d2.add(new Field("f4", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    w.addDocument(d2);
> +    w.close();
> +
> +    return new IndexSearcher(dir);
> +  }
> +
> +  // Fields 1 & 2 in one index, 3 & 4 in other, with ParallelReader:
> +  private Searcher parallel() throws IOException {
> +    Directory dir1 = new RAMDirectory();
> +    IndexWriter w1 = new IndexWriter(dir1, new StandardAnalyzer(),
> true);
> +    Document d1 = new Document();
> +    d1.add(new Field("f1", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d1.add(new Field("f2", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    w1.addDocument(d1);
> +    Document d2 = new Document();
> +    d2.add(new Field("f1", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d2.add(new Field("f2", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    w1.addDocument(d2);
> +    w1.close();
> +
> +    Directory dir2 = new RAMDirectory();
> +    IndexWriter w2 = new IndexWriter(dir2, new StandardAnalyzer(),
> true);
> +    Document d3 = new Document();
> +    d3.add(new Field("f3", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d3.add(new Field("f4", "v1", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    w2.addDocument(d3);
> +    Document d4 = new Document();
> +    d4.add(new Field("f3", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    d4.add(new Field("f4", "v2", Field.Store.YES,
> Field.Index.TOKENIZED));
> +    w2.addDocument(d4);
> +    w2.close();
> +    
> +    ParallelReader pr = new ParallelReader();
> +    pr.add(IndexReader.open(dir1));
> +    pr.add(IndexReader.open(dir2));
> +
> +    return new IndexSearcher(pr);
> +  }
> +}
> 
> >
---------------------------------------------------------------------
> To unsubscribe, e-mail: [EMAIL PROTECTED]
> For additional commands, e-mail: [EMAIL PROTECTED]


---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to