Revision: 17226
          http://sourceforge.net/p/gate/code/17226
Author:   valyt
Date:     2014-01-14 17:25:33 +0000 (Tue, 14 Jan 2014)
Log Message:
-----------
Added a light dusting of synchronization to make sure all sub-indexes break the 
document stream into batches at the same location. This is needed to support 
live-searchable indexes, as the maximum document pointer must be the same in 
all sub-indexes.

Moved the zip collection functionality to MimirIndex.

Moved the document deletion functionality to MimirIndex.

Started work on supporting search for M?\195?\173mir 5.0 indexes.

Modified Paths:
--------------
    mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2014-01-14 17:25:33 UTC (rev 17226)
@@ -24,15 +24,38 @@
 import gate.mimir.index.IndexException;
 import gate.mimir.index.mg4j.GATEDocument;
 import gate.mimir.index.mg4j.MimirIndexBuilder;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollection;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollectionWriter;
+import gate.mimir.index.mg4j.zipcollection.DocumentData;
 import gate.mimir.search.QueryEngine;
 import gate.util.GateRuntimeException;
+import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.log4j.Logger;
 
@@ -58,16 +81,65 @@
    */
   public static final String INDEX_CONFIG_FILENAME = "config.xml";
   
+  /**
+   * The name for the file (stored in the root index directory) containing 
+   * the serialised version of the {@link #deletedDocumentIds}. 
+   */
+  public static final String DELETED_DOCUMENT_IDS_FILE_NAME = "deleted.ser";
+  
+  /**
+   * How many occurrences to index in each batch. This metric is more 
reliable, 
+   * than document counts, as it does not depend on average document size. 
+   */
+  public static final int DEFAULT_OCCURRENCES_PER_BATCH = 100 * 1000 * 1000;
+  
+  /**
+   * The default length for the buffer input / output queues for sub-indexers.
+   */
+  public static final int DEFAULT_INDEXING_QUEUE_SIZE = 30;
+  
+  /**
+   * The maximum number of documents to be stored in the document cache.
+   */
+  protected static final int DOCUMENT_DATA_CACHE_SIZE = 100;
+  
+  /**
+   * How many occurrences to be accumulated in RAM before a new tail batch is
+   * written to disk.
+   */
+  protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
+  
   private static final Logger logger = Logger.getLogger(MimirIndex.class);
   
   /**
-   * A {@link Runnable} that collects the documents from the sub-indexers and
-   * deletes them from GATE.
+   * A {@link Runnable} that performs various index maintenance in a separate
+   * thread. The principal role is to collect the documents from the 
+   * sub-indexers and delete them from GATE.
    */
-  protected class DocumentCollector implements Runnable{
+  protected class IndexMaintenanceRunner implements Runnable{
     public void run(){
       boolean finished = false;
       while(!finished){
+        // if any batches have finished writing, then decrease the 
+        // occurrencesInRam value
+        Iterator<Future<Long>> futureIter = batchesBeingDumped.iterator();
+        while(futureIter.hasNext()) {
+          Future<Long> aFuture = futureIter.next();
+          try {
+            if(aFuture.isDone()) {
+              occurrencesInRam -= aFuture.get();
+              futureIter.remove();
+            }
+          } catch(InterruptedException e) {
+            Thread.currentThread().interrupt();
+          } catch(ExecutionException e) {
+            // we never set an exception on these futures, so this should never
+            // happen
+            logger.error(
+                "Got unexpected exception from future doing batch dump", e);
+          }
+        }
+        
         GATEDocument currentDocument = null;
         try {
           //get one document from each of the sub-indexers
@@ -103,6 +175,31 @@
     }
   }  
   
+  
+  private class WriteDeletedDocsTask extends TimerTask {
+    public void run() {
+      synchronized(writeDeletedDocsTimer) {
+        File delFile = new File(indexDirectory, 
DELETED_DOCUMENT_IDS_FILE_NAME);
+        if(delFile.exists()) {
+          delFile.delete();
+        }
+        try{
+          logger.debug("Writing deleted documents set");
+          ObjectOutputStream oos = new ObjectOutputStream(
+                  new GZIPOutputStream(
+                  new BufferedOutputStream(
+                  new FileOutputStream(delFile))));
+          oos.writeObject(deletedDocumentIds);
+          oos.flush();
+          oos.close();
+          logger.debug("Writing deleted documents set completed.");
+        }catch (IOException e) {
+          logger.error("Exception while writing deleted documents set", e);
+        }        
+      }
+    }
+  }
+  
   /**
    * The {@link IndexConfig} used for this index.
    */
@@ -114,14 +211,56 @@
   protected File indexDirectory;
   
   /**
+   * A zip collection builder used to build a zip of the collection
+   * if this has been requested.
+   */
+  protected DocumentCollectionWriter collectionWriter = null;
+  
+  /**
+   * The zipped document collection from MG4J (built during the indexing of the
+   * first token feature). This can be used to obtain the document text and to
+   * display the content of the hits.
+   */
+  protected DocumentCollection documentCollection;
+  
+  /**
+   * A cache of {@link DocumentData} values used for returning the various
+   * document details (title, URI, text).
+   */
+  protected Long2ObjectLinkedOpenHashMap<DocumentData> documentCache;
+  
+  /**
    * The thread used to clean-up GATE documents after they have been indexed.
    */
   protected Thread documentsCollectorThread;
   
+  
+  /**
+   * A (synchronized) collection of futures that work on saving batches.
+   */
+  protected List<Future<Long>> batchesBeingDumped;
+  
   protected volatile boolean closed = false;
   
+  /**
+   * The set of IDs for the documents marked as deleted. 
+   */
+  private transient SortedSet<Long> deletedDocumentIds;
   
   /**
+   * A timer used to execute the writing of deleted documents data to disk.
+   * This timer is used to create a delay, allowing a batch of writes to be 
+   * coalesced into a single one.
+   */
+  private transient Timer writeDeletedDocsTimer;
+  
+  /**
+   * The timer task used to top write to disk the deleted documents data.
+   * This value is non-null only when there is a pending write. 
+   */
+  private volatile transient WriteDeletedDocsTask writeDeletedDocsTask;
+  
+  /**
    * The token indexes, in the order they are listed in the {@link 
#indexConfig}.
    */
   protected AtomicTokenIndex[] tokenIndexes;
@@ -137,12 +276,21 @@
    */
   protected AtomicIndex[] subIndexes;
   
+  protected int indexingQueueSize = DEFAULT_INDEXING_QUEUE_SIZE;
   
   /**
+   * The total number of occurrences in all sub-indexes that have not yet been
+   * written to disk.
+   */
+  protected long occurrencesInRam;
+  
+  /**
    * Create a new Index.
    * @param indexConfig the configuration for the index.
+   * @throws IOException 
+   * @throws IndexException 
    */
-  public MimirIndex(IndexConfig indexConfig) {
+  public MimirIndex(IndexConfig indexConfig) throws IOException, 
IndexException {
     this.indexConfig = indexConfig;
     this.indexDirectory = this.indexConfig.getIndexDirectory();
     
@@ -175,8 +323,13 @@
    * Opens the index files, if any, prepares all the sub-indexers specified in 
    * the index config, and gets this index ready to start indexing documents 
and
    * answer queries. 
+   * @throws IOException 
+   * @throws IndexException 
    */
-  protected void openIndex() {
+  protected void openIndex() throws IOException, IndexException {
+    // ####################
+    // Prepare for indexing
+    // ####################
     // read the index config and create the sub-indexers
     TokenIndexerConfig tokConfs[] = indexConfig.getTokenIndexers();
     tokenIndexes = new AtomicTokenIndex[tokConfs.length];
@@ -187,8 +340,8 @@
           subIndexname, 
           new File(indexDirectory, subIndexname), 
           tokConfs[i].isDirectIndexEnabled(),
-          new LinkedBlockingQueue<GATEDocument>(),
-          new LinkedBlockingQueue<GATEDocument>(),
+          new LinkedBlockingQueue<GATEDocument>(indexingQueueSize),
+          new LinkedBlockingQueue<GATEDocument>(indexingQueueSize),
           tokConfs[i],
           i == 0);
     }
@@ -213,9 +366,32 @@
     System.arraycopy(mentionIndexes, 0, subIndexes, tokenIndexes.length, 
         mentionIndexes.length);
     
-    // start the collector thread
-    documentsCollectorThread = new Thread(new DocumentCollector());
+    occurrencesInRam = 0;
+    
+    batchesBeingDumped = Collections.synchronizedList(new 
LinkedList<Future<Long>>());
+    
+    // start the documents collector thread
+    documentsCollectorThread = new Thread(new IndexMaintenanceRunner());
     documentsCollectorThread.start();
+
+    // prepare the zip collection writer
+    logger.info("Creating zipped collection for field \"" + 
+        tokConfs[0].getFeatureName() + "\"");
+    collectionWriter = new DocumentCollectionWriter(indexDirectory);
+
+    // #####################
+    // Prepare for searching
+    // #####################
+    readDeletedDocs();
+    // start the timer that regularly writes the deleted documents list
+    writeDeletedDocsTimer = new Timer("Delete documents writer");
+
+    // open the zipped document collection
+    documentCollection = new DocumentCollection(indexDirectory);
+    
+    // prepare the document cache
+    documentCache = new Long2ObjectLinkedOpenHashMap<DocumentData>();
+    
   }
   
   /**
@@ -228,13 +404,43 @@
   public void indexDocument(Document document) throws InterruptedException {
     if(closed) throw new IllegalStateException("This index has been closed, "
         + "no further documents can be indexed.");
+    
+    // check if we need to write a new batch:
+    // we have too many occurrences and 
+    // there are no outstanding batch writing operations
+    if(occurrencesInRam > occurrencesPerBatch && batchesBeingDumped.isEmpty()) 
{
+      for(AtomicIndex aSubIndex: subIndexes){
+        batchesBeingDumped.add(aSubIndex.requestDumpToDisk());
+      }
+    }
     GATEDocument gDocument = new GATEDocument(document, indexConfig);
     for(AtomicIndex aSubIndex: subIndexes){
       aSubIndex.getInputQueue().put(gDocument);
     }
+
   }
+
+  /**
+   * Notifies this index that more occurrences have been stored in RAM by one 
of
+   * its sub-indexes.
+   * 
+   * @param occurrences
+   */
+  synchronized public void addOccurrences(long occurrences) {
+    occurrencesInRam += occurrences;
+  }
   
   /**
+   * Called by the first token indexer when a new document has been indexed
+   * to ask the main index to save the necessary zip collection data
+   * @param gDocument
+   * @throws IndexException 
+   */
+  public void writeZipDocumentData(DocumentData docData) throws IndexException 
{
+    collectionWriter.writeDocument(docData);
+  }
+  
+  /**
    * Stops this index from accepting any further document for indexing, stops
    * this index from accepting any more queries, finishes indexing all the 
    * currently queued documents, writes all the files to disk, and returns.
@@ -247,6 +453,20 @@
         aSubIndex.getInputQueue().put(GATEDocument.END_OF_QUEUE);
       }      
     }
+    // write the deleted documents set
+    synchronized(writeDeletedDocsTimer) {
+      if(writeDeletedDocsTask != null) {
+        writeDeletedDocsTask.cancel();
+      }
+      writeDeletedDocsTimer.cancel();
+      // explicitly call it one last time
+      new WriteDeletedDocsTask().run();
+    }
+    // close the document collection
+    documentCollection.close();
+
+    documentCache.clear();
+
     // wait for indexing to end
     documentsCollectorThread.join();
   }
@@ -256,11 +476,160 @@
   }
   
   public QueryEngine getQueryEngine() {
+    
     return null;
   }
 
   public File getIndexDirectory() {
     return indexDirectory;
   }
+
+  public long getOccurrencesPerBatch() {
+    return occurrencesPerBatch;
+  }
+
+  public int getIndexingQueueSize() {
+    return indexingQueueSize;
+  }
+
+  public void setIndexingQueueSize(int indexingQueueSize) {
+    this.indexingQueueSize = indexingQueueSize;
+  }
+
+  public void setOccurrencesPerBatch(long occurrencesPerBatch) {
+    this.occurrencesPerBatch = occurrencesPerBatch;
+  }
   
+  public DocumentCollection getDocumentCollection() {
+    return documentCollection;
+  }
+
+  /**
+   * Gets the {@link DocumentData} for a given document ID, from the on disk 
+   * document collection. In memory caching is performed to reduce the cost of 
+   * this call. 
+   * @param documentID
+   *          the ID of the document to be obtained.
+   * @return the {@link DocumentData} associated with the given document ID.
+   * @throws IndexException
+   */
+  public synchronized DocumentData getDocumentData(long documentID)
+  throws IndexException {
+    if(isDeleted(documentID)) {
+      throw new IndexException("Invalid document ID " + documentID);
+    }
+    DocumentData documentData = documentCache.getAndMoveToFirst(documentID);
+    if(documentData == null) {
+      // cache miss
+      documentData = documentCollection.getDocumentData(documentID);
+      documentCache.putAndMoveToFirst(documentID, documentData);
+      if(documentCache.size() > DOCUMENT_DATA_CACHE_SIZE) {
+        documentCache.removeLast();
+      }
+    }
+    return documentData;
+  }
+  
+  /**
+   * Marks a given document (identified by its ID) as deleted. Deleted 
documents
+   * are never returned as search results.
+   * @param documentId
+   */
+  public void deleteDocument(long documentId) {
+    if(deletedDocumentIds.add(documentId)) {
+      writeDeletedDocsLater();
+    }
+  }
+
+  /**
+   * Marks the given batch of documents (identified by ID) as deleted. Deleted
+   * documents are never returned as search results.
+   * @param documentIds
+   */
+  public void deleteDocuments(Collection<? extends Number> documentIds) {
+    List<Long> idsToDelete = new ArrayList<Long>(documentIds.size());
+    for(Number n : documentIds) {
+      idsToDelete.add(Long.valueOf(n.longValue()));
+    }
+    if(deletedDocumentIds.addAll(idsToDelete)) {
+      writeDeletedDocsLater();
+    }
+  }
+  
+  /**
+   * Checks whether a given document (specified by its ID) is marked as 
deleted. 
+   * @param documentId
+   * @return
+   */
+  public boolean isDeleted(long documentId) {
+    return deletedDocumentIds.contains(documentId);
+  }
+  
+  /**
+   * Mark the given document (identified by ID) as <i>not</i> deleted.  Calling
+   * this method for a document ID that is not currently marked as deleted has
+   * no effect.
+   */
+  public void undeleteDocument(long documentId) {
+    if(deletedDocumentIds.remove(documentId)) {
+      writeDeletedDocsLater();
+    }
+  }
+  
+  /**
+   * Mark the given documents (identified by ID) as <i>not</i> deleted.  
Calling
+   * this method for a document ID that is not currently marked as deleted has
+   * no effect.
+   */
+  public void undeleteDocuments(Collection<? extends Number> documentIds) {
+    List<Long> idsToUndelete = new ArrayList<Long>(documentIds.size());
+    for(Number n : documentIds) {
+      idsToUndelete.add(Long.valueOf(n.longValue()));
+    }
+    if(deletedDocumentIds.removeAll(idsToUndelete)) {
+      writeDeletedDocsLater();
+    }
+  }
+  
+  /**
+   * Writes the set of deleted document to disk in a background thread, after a
+   * short delay. If a previous request has not started yet, this new request 
+   * will replace it. 
+   */
+  protected void writeDeletedDocsLater() {
+    synchronized(writeDeletedDocsTimer) {
+      if(writeDeletedDocsTask != null) {
+        writeDeletedDocsTask.cancel();
+      }
+      writeDeletedDocsTask = new WriteDeletedDocsTask();
+      writeDeletedDocsTimer.schedule(writeDeletedDocsTask, 1000);
+    }
+  }
+  
+  /**
+   * Reads the list of deleted documents from disk. 
+   */
+  @SuppressWarnings("unchecked")
+  protected synchronized void readDeletedDocs() throws IOException{
+    deletedDocumentIds = Collections.synchronizedSortedSet(
+            new TreeSet<Long>());
+    File delFile = new File(indexDirectory, DELETED_DOCUMENT_IDS_FILE_NAME);
+    if(delFile.exists()) {
+      try {
+        ObjectInputStream ois = new ObjectInputStream(
+                new GZIPInputStream(
+                new BufferedInputStream(
+                new FileInputStream(delFile))));
+        // an old index will have saved a Set<Integer>, a new one will be
+        // Set<Long>
+        Set<? extends Number> savedSet = (Set<? extends 
Number>)ois.readObject();
+        for(Number n : savedSet) {
+          deletedDocumentIds.add(Long.valueOf(n.longValue()));
+        }
+      } catch(ClassNotFoundException e) {
+        // this should never happen
+        throw new RuntimeException(e);
+      }
+    }
+  }
 }

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2014-01-14 17:25:33 UTC (rev 17226)
@@ -75,6 +75,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -94,6 +97,50 @@
 public abstract class AtomicIndex implements Runnable {
   
   /**
+   * A custom implementation of Future, allowing us to execute the work in the
+   * indexing thread, and report its result.
+   *
+   * @param <T>
+   */
+  protected static class CustomFuture<T> extends FutureTask<T> {
+
+    public CustomFuture(Callable<T> operation) {
+      super(operation);
+    }
+
+    @Override
+    protected void done() {
+      super.done();
+    }
+
+    @Override
+    protected void set(T v) {
+      super.set(v);
+    }
+
+    @Override
+    protected void setException(Throwable t) {
+      super.setException(t);
+    }
+    
+    
+  }
+  
+  private static final Callable<Long> noOpLong = new Callable<Long>() {
+    @Override
+    public Long call() throws Exception {
+      return null;
+    }
+  };
+  
+  private static final Callable<Void> noOpVoid = new Callable<Void>() {
+    @Override
+    public Void call() throws Exception {
+      return null;
+    }
+  };
+  
+  /**
    * An in-RAM representation of a postings list
    */
   protected static class PostingsList {
@@ -404,15 +451,21 @@
    */
   public static final String DOCUMENTS_QUEUE_FILE_NAME = "queued-documents";
   
+  /** The initial size of the term map. */
+  private static final int INITIAL_TERM_MAP_SIZE = 1024;
+  
   /**
-   * How many occurrences to index in each batch. This metric is more 
reliable, 
-   * than document counts, as it does not depend on average document size. 
+   * A marker value that gets queued to indicate a request to
+   * write the in-RAM data to a new index batch.
    */
-  public static final int DEFAULT_OCCURRENCES_PER_BATCH = 20 * 1000 * 1000;
+  private static final GATEDocument DUMP_BATCH = new GATEDocument(){};
+
+  /**
+   * A marker value that gets queued to indicate a request to combine all the
+   * on-disk batches into a new head.
+   */
+  private static final GATEDocument COMPRESS_INDEX = new GATEDocument(){};
   
-  /** The initial size of the term map. */
-  private static final int INITIAL_TERM_MAP_SIZE = 1024;
-  
   private static Logger logger = Logger.getLogger(AtomicIndex.class);
   
   protected static final PatternFilenameFilter TAILS_FILENAME_FILTER = 
@@ -441,11 +494,7 @@
    */
   protected long occurrencesInRAM = 0;
   
-  /**
-   * How many occurrences to be accumulated in RAM before a new tail batch is
-   * written to disk.
-   */
-  protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
+
   
   /**
    * The {@link MimirIndex} that this atomic index is a member of.
@@ -519,19 +568,19 @@
   
   /**
    * If a request was made to compress the index (combine all sub-indexes 
-   * into a new head) this flag will be set to true. The operation will be 
+   * into a new head) this value will be non-null. The operation will be 
    * performed on the indexing thread at the first opportunity. At that point 
-   * the flag will be reset to false.
+   * this future will complete, and the value will be set back to null.
    */
-  protected volatile boolean indexCompressionRequested = false;
+  protected CustomFuture<Void> indexCompressionRequested;
   
   /**
-   * If a request was made to write the in-RAM index data to disk this flag 
-   * will be set to true. The operation will be performed on the indexing
-   * thread at the first opportunity.  At that point the flag will be reset to 
-   * false.  
+   * If a request was made to write the in-RAM index data to disk this value 
+   * will be not null. The operation will be performed on the indexing
+   * thread at the first opportunity.  At that point the Future will complete, 
+   * and the value will be set back to null.
    */
-  protected volatile boolean tailWriteRequested = false;
+  protected CustomFuture<Long> tailWriteRequested;
   
   /**
    * Creates a new AtomicIndex
@@ -770,12 +819,11 @@
     }
 
          if(hasDirectIndex) {
+           //TODO
            // dump new direct tail (invert the tail just written)
            // merge new direct tail into direct index cluster
          }
          
-         // clear queued-documents folder
-       
          // clear out internal state, in preparation for the next tail  
          newBatch();
        }
@@ -874,19 +922,37 @@
        /**
         * Instructs this index to dump to disk all the in-RAM index data at 
the fist 
         * opportunity.
+        * @return a future value that, upon completion, will return the number 
of
+        * occurrences written to disk.
         */
-       public void requestDumpToDisk() {
-         tailWriteRequested = true;
+       public synchronized Future<Long> requestDumpToDisk() {
+         if(tailWriteRequested == null) {
+           tailWriteRequested = new CustomFuture<Long>(noOpLong);
+         }
+   try {
+      inputQueue.put(DUMP_BATCH);
+    } catch(InterruptedException e) {
+      tailWriteRequested.setException(e);
+    }
+         return tailWriteRequested;
        }
        
-       public void requestIndexCompression() {
-        indexCompressionRequested = true;
+       public synchronized Future<Void> requestIndexCompression() {
+         if(indexCompressionRequested == null) {
+           indexCompressionRequested = new CustomFuture<Void>(noOpVoid);
+         }
+          try {
+             inputQueue.put(COMPRESS_INDEX);
+           } catch(InterruptedException e) {
+             indexCompressionRequested.setException(e);
+           }
+        return indexCompressionRequested;
        }
        
        /**
         * Opens one sub-index, specified as a directory inside this Atom 
Index's
         * index directory.
-        * @param indexDir
+        * @param subIndexDirname
         * @return
         * @throws IOException 
         * @throws IndexException 
@@ -944,27 +1010,43 @@
         do{
           aDocument = inputQueue.take();
           if(aDocument != GATEDocument.END_OF_QUEUE) {
-            try {
-              processDocument(aDocument);
-            } catch(Throwable e) {
-              logger.error("Problem while indexing document!", e);
-            }
-            //dump batch if needed AND there is data to dump
-            if (occurrencesPerBatch > 0 && occurrencesInRAM > 
occurrencesPerBatch){
+            if(aDocument == DUMP_BATCH) {
+              //dump batch was requested
+              long occurencestToWrite = occurrencesInRAM;
               writeCurrentTail();
-            }
-            if(indexCompressionRequested) {
+              synchronized(this) {
+                if(tailWriteRequested != null) {
+                  tailWriteRequested.set(occurencestToWrite);
+                  tailWriteRequested.done();
+                  tailWriteRequested = null;
+                }              
+              }              
+            } else if(aDocument == COMPRESS_INDEX) {
+              // compress index was requested
               compressIndex();
+              synchronized(this) {
+                if(indexCompressionRequested != null) {
+                  indexCompressionRequested.done();
+                  indexCompressionRequested = null;
+                }
+              }              
+            } else {
+              try {
+                long occurencesBefore = occurrencesInRAM;
+                processDocument(aDocument);
+                parent.addOccurrences(occurrencesInRAM - occurencesBefore);
+              } catch(Throwable e) {
+                logger.error("Problem while indexing document!", e);
+              }              
             }
-            if(tailWriteRequested) {
-              writeCurrentTail();
-            }            
           } else {
             // close down
             writeCurrentTail();
             flush();
           }
-          outputQueue.put(aDocument);
+          if(aDocument != DUMP_BATCH && aDocument != COMPRESS_INDEX) {
+            outputQueue.put(aDocument);  
+          }
         } while(aDocument != GATEDocument.END_OF_QUEUE);
          }
          }catch(InterruptedException e) {
@@ -980,9 +1062,7 @@
         * Closes all file-based resources.
         * @throws IOException
         */
-       protected void flush() throws IOException {
-         
-       };
+       abstract protected void flush() throws IOException;
        
        /**
         * Notifies this index to stop its indexing operations, and waits for 
all data
@@ -1047,9 +1127,15 @@
   protected abstract String[] calculateTermStringForAnnotation(Annotation ann,
           GATEDocument gateDocument) throws IndexException;
   
+  /**
+   * Adds the supplied document to the in-RAM index.
+   * @param gateDocument the document to index
+   * @throws IndexException
+   */
   protected void processDocument(GATEDocument gateDocument) throws 
IndexException{
     //zero document related counters
     tokenPosition = 0;
+    
     documentStarting(gateDocument);
     //get the annotations to be processed
     Annotation[] annotsToProcess = getAnnotsToProcess(gateDocument);
@@ -1079,7 +1165,6 @@
       int docLength = tokenPosition + 1;
       if(docLength > maxDocSizeInRAM) maxDocSizeInRAM = docLength;
       documentSizesInRAM.add(docLength);
-      
     } catch (IOException e) {
       throw new IndexException("IO Exception while indexing", e);
     }finally {
@@ -1113,14 +1198,7 @@
     }
   }
 
-  public long getOccurrencesPerBatch() {
-    return occurrencesPerBatch;
-  }
 
-  public void setOccurrencesPerBatch(long occurrencesPerBatch) {
-    this.occurrencesPerBatch = occurrencesPerBatch;
-  }
-
   public File getIndexDirectory() {
     return indexDirectory;
   }

Modified: 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java    
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java    
2014-01-14 17:25:33 UTC (rev 17226)
@@ -21,6 +21,7 @@
 import gate.mimir.IndexConfig.TokenIndexerConfig;
 import gate.mimir.index.mg4j.GATEDocument;
 import gate.mimir.index.mg4j.GATEDocumentFactory;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollection;
 import gate.mimir.index.mg4j.zipcollection.DocumentCollectionWriter;
 import gate.mimir.index.mg4j.zipcollection.DocumentData;
 import it.unimi.di.big.mg4j.index.Index;
@@ -51,27 +52,11 @@
   private static final String[] DO_NOT_INDEX = new String[]{};
   
   /**
-   * A zip collection builder used to build a zip of the collection
-   * if this has been requested.
+   * Is this token index responsible for writing the zip collection?
    */
-  protected DocumentCollectionWriter collectionWriter = null;
+  protected boolean zipCollectionEnabled = false;
   
   /**
-   * An array of helpers for creating document metadata. 
-   */
-  protected DocumentMetadataHelper[] docMetadataHelpers;
-  
-  /**
-   * Stores the document URI for writing to the zip collection;
-   */
-  protected String documentURI;
-  
-  /**
-   * Stores the document title for writing to the zip collection. 
-   */
-  protected String documentTitle;
-  
-  /**
    * Stores the document tokens for writing to the zip collection;
    */
   protected List<String> documentTokens;
@@ -81,6 +66,10 @@
    */
   protected List<String> documentNonTokens;
   
+  /**
+   * An array of helpers for creating document metadata. 
+   */
+  protected DocumentMetadataHelper[] docMetadataHelpers;
   
   /**
    * GATE document factory used by the zip builder, and also to
@@ -111,11 +100,13 @@
     super(parent, name, indexDirectory, hasDirectIndex, inputQueue, 
outputQueue);
     this.featureName = config.getFeatureName();
     this.termProcessor = config.getTermProcessor();
+    this.zipCollectionEnabled = zipCollection;
+    if(zipCollectionEnabled) {
+      documentTokens = new LinkedList<String>();
+      documentNonTokens = new LinkedList<String>();
+      docMetadataHelpers = parent.getIndexConfig().getDocMetadataHelpers();
+    }
     
-    if(zipCollection) {
-      logger.info("Creating zipped collection for field \"" + name + "\"");
-      collectionWriter = new DocumentCollectionWriter(indexDirectory);
-    }
     // save the term processor
     additionalProperties.setProperty(Index.PropertyKeys.TERMPROCESSOR, 
         ObjectParser.toSpec(termProcessor));
@@ -130,11 +121,8 @@
    * is about to start.
    */
   protected void documentStarting(GATEDocument gateDocument) throws 
IndexException {
-    if(collectionWriter != null) {
-      documentURI = gateDocument.uri().toString();
-      documentTitle = gateDocument.title().toString();
-      documentTokens = new LinkedList<String>();
-      documentNonTokens = new LinkedList<String>();
+    if(zipCollectionEnabled) {
+      // notify the metadata helpers
       if(docMetadataHelpers != null){
         for(DocumentMetadataHelper aHelper : docMetadataHelpers){
           aHelper.documentStart(gateDocument);
@@ -151,19 +139,20 @@
    * the current document.
    */
   protected void documentEnding(GATEDocument gateDocument) throws 
IndexException {
-    if(collectionWriter != null) {
-      DocumentData docData = new DocumentData(documentURI, 
-              documentTitle, 
-              documentTokens.toArray(new String[documentTokens.size()]),
-              documentNonTokens.toArray(new 
String[documentNonTokens.size()])); 
+    if(zipCollectionEnabled) {
+      DocumentData docData = new DocumentData(
+          gateDocument.uri().toString(), 
+          gateDocument.title().toString(),
+          documentTokens.toArray(new String[documentTokens.size()]),
+          documentNonTokens.toArray(new String[documentNonTokens.size()])); 
       if(docMetadataHelpers != null){
         for(DocumentMetadataHelper aHelper : docMetadataHelpers){
           aHelper.documentEnd(gateDocument, docData);
         }
       }
-      collectionWriter.writeDocument(docData);
-      documentTokens = null;
-      documentNonTokens = null;
+      parent.writeZipDocumentData(docData);
+      documentTokens.clear();
+      documentNonTokens.clear();
     }
   }
 
@@ -204,7 +193,7 @@
     String value = (String)tokenFeatures.get(featureName);
     currentTerm.replace(value == null ? "" : value);
     //save the *unprocessed* term to the collection, if required.
-    if(collectionWriter != null) {
+    if(zipCollectionEnabled) {
       documentTokens.add(currentTerm.toString());
       documentNonTokens.add(gateDocument.getNonTokens()[tokenPosition]);
     }
@@ -223,9 +212,5 @@
    */
   @Override
   protected void flush() throws IOException {
-    if(collectionWriter != null) {
-      logger.info("Saving zipped collection");
-      collectionWriter.close();
-    }
-  }  
+  }
 }

Modified: 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java   
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java   
2014-01-14 17:25:33 UTC (rev 17226)
@@ -150,7 +150,7 @@
   /**
    * Private constructor used to create the {@link #END_OF_QUEUE} instance.
    */
-  private GATEDocument(){
+  protected GATEDocument(){
   }
   
   public GATEDocument(gate.Document gateDocument,

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java        
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java        
2014-01-14 17:25:33 UTC (rev 17226)
@@ -19,6 +19,7 @@
 import gate.mimir.DocumentRenderer;
 import gate.mimir.IndexConfig;
 import gate.mimir.IndexConfig.SemanticIndexerConfig;
+import gate.mimir.MimirIndex;
 import gate.mimir.SemanticAnnotationHelper;
 import gate.mimir.index.IndexException;
 import gate.mimir.index.Indexer;
@@ -76,31 +77,7 @@
  */
 public class QueryEngine {
   
-  private class WriteDeletedDocsTask extends TimerTask {
-    public void run() {
-      synchronized(writeDeletedDocsTimer) {
-        File delFile = new File(indexDir, DELETED_DOCUMENT_IDS_FILE_NAME);
-        if(delFile.exists()) {
-          delFile.delete();
-        }
-        try{
-          logger.debug("Writing deleted documents set");
-          ObjectOutputStream oos = new ObjectOutputStream(
-                  new GZIPOutputStream(
-                  new BufferedOutputStream(
-                  new FileOutputStream(delFile))));
-          oos.writeObject(deletedDocumentIds);
-          oos.flush();
-          oos.close();
-          logger.debug("Writing deleted documents set completed.");
-        }catch (IOException e) {
-          logger.error("Exception while writing deleted documents set", e);
-        }        
-      }
-    }
-  }
   
-  
   /**
    * Represents the type of index that should be searched. Mimir uses two types
    * of indexes: token indexes (which index the text input) and annotation
@@ -134,55 +111,12 @@
   protected IndexReaderPool[] directIndexReaderPools;
 
   /**
-   * The zipped document collection from MG4J (built during the indexing of the
-   * first token feature). This can be used to obtain the document text and to
-   * display the content of the hits.
-   */
-  protected DocumentCollection documentCollection;
-
-  /**
-   * A cache of {@link DocumentData} values used for returning the various
-   * document details (title, URI, text).
-   */
-  protected Long2ObjectLinkedOpenHashMap<DocumentData> documentCache;
-
-  /**
-   * The maximum number of documents to be stored in the document cache.
-   */
-  protected static final int DOCUMENT_CACHE_SIZE = 100;
-  
-  /**
-   * The set of IDs for the documents marked as deleted. 
-   */
-  private transient SortedSet<Long> deletedDocumentIds;
-  
-  /**
-   * A timer used to execute the writing of deleted documents data to disk.
-   * This timer is used to create a delay, allowing a batch of writes to be 
-   * coalesced into a single one.
-   */
-  private transient Timer writeDeletedDocsTimer;
-  
-  /**
-   * The timer task used to top write to disk the deleted documents data.
-   * This value is non-null only when there is a pending write. 
-   */
-  private volatile transient WriteDeletedDocsTask writeDeletedDocsTask;
-
-  /**
    * The document sizes used during search time (if running in document mode) 
to
    * simulate document-spanning annotations.
    */
   private transient IntBigList documentSizes;
   
   /**
-   * The name for the file (stored in the root index directory) containing 
-   * the serialised version of the {@link #deletedDocumentIds}. 
-   */
-  public static final String DELETED_DOCUMENT_IDS_FILE_NAME = "deleted.ser";
-  
-  
-  /**
    * The maximum size of an index that can be loaded in memory (by default 64
    * MB).
    */
@@ -194,11 +128,10 @@
    */
   public static final int DEFAULT_DOCUMENT_BLOCK_SIZE = 1000;
 
-  
   /**
-   * The top level directory of the Mimir index.
+   * The index being searched.
    */
-  protected File indexDir;
+  protected MimirIndex index;
 
   /**
    * The index configuration this index was built from.
@@ -242,13 +175,6 @@
    * files).
    */
   private List<QueryRunner> activeQueryRunners;
-  
-  /**
-   * @return the indexDir
-   */
-  public File getIndexDir() {
-    return indexDir;
-  }
 
   /**
    * Are sub-bindings used in this query engine. Sub-bindings are used to
@@ -513,7 +439,18 @@
     return indexConfig;
   }
 
+  
+  
   /**
+   * Constructs a new query engine for a {@link MimirIndex}.
+   * @param index the index to be searched.
+   */
+  public QueryEngine(MimirIndex index) {
+    this.index = index;
+    
+  }
+
+  /**
    * Constructs a new {@link QueryEngine} for a specified Mimir index. The 
mimir
    * semantic repository will be initialized using the default location in the
    * filesystem, provided by the IndexConfig
@@ -524,7 +461,6 @@
    *           if there are problems while opening the indexes.
    */
   public QueryEngine(File indexDir) throws gate.mimir.index.IndexException {
-    this.indexDir = indexDir;
     // read the index config
     try {
       indexConfig =
@@ -545,8 +481,8 @@
           }
         }
       }
-      readDeletedDocs();
       
+      
       activeQueryRunners = Collections.synchronizedList(
               new ArrayList<QueryRunner>());
     } catch(FileNotFoundException e) {
@@ -555,7 +491,7 @@
       throw new IndexException("Input/output exception!", e);
     }
     subBindingsEnabled = false;
-    writeDeletedDocsTimer = new Timer("Delete documents writer");
+
   }
 
   /**
@@ -712,7 +648,7 @@
    */
   public String[][] getRightContext(Binding hit, int numTokens)
   throws IndexException {
-    DocumentData docData = getDocumentData(hit.getDocumentId());
+    DocumentData docData = index.getDocumentData(hit.getDocumentId());
     int startOffset = hit.getTermPosition() + hit.getLength();
     if(startOffset >= docData.getTokens().length) {
       // hit is at the end of the document
@@ -751,7 +687,7 @@
    */
   public String[][] getText(long documentID, int termPosition, int length)
   throws IndexException {
-    return getDocumentData(documentID).getText(termPosition, length);
+    return index.getDocumentData(documentID).getText(termPosition, length);
   }
 
   /**
@@ -773,15 +709,15 @@
     DocumentRenderer docRenderer = indexConfig.getDocumentRenderer();
     if(docRenderer == null) { throw new IndexException(
     "No document renderer is configured for this index!"); }
-    docRenderer.render(getDocumentData(docID), hits, output);
+    docRenderer.render(index.getDocumentData(docID), hits, output);
   }
 
   public String getDocumentTitle(long docID) throws IndexException {
-    return getDocumentData(docID).getDocumentTitle();
+    return index.getDocumentData(docID).getDocumentTitle();
   }
 
   public String getDocumentURI(long docID) throws IndexException {
-    return getDocumentData(docID).getDocumentURI();
+    return index.getDocumentData(docID).getDocumentURI();
   }
 
   /**
@@ -798,35 +734,11 @@
    */
   public Serializable getDocumentMetadataField(long docID, String fieldName) 
       throws IndexException {
-    return getDocumentData(docID).getMetadataField(fieldName);
+    return index.getDocumentData(docID).getMetadataField(fieldName);
   }
   
-  /**
-   * Gets the {@link DocumentData} for a given document ID, from the on disk 
-   * document collection. In memory caching is performed to reduce the cost of 
-   * this call. 
-   * @param documentID
-   *          the ID of the document to be obtained.
-   * @return the {@link DocumentData} associated with the given document ID.
-   * @throws IndexException
-   */
-  public synchronized DocumentData getDocumentData(long documentID)
-  throws IndexException {
-    if(isDeleted(documentID)) {
-      throw new IndexException("Invalid document ID " + documentID);
-    }
-    DocumentData documentData = documentCache.getAndMoveToFirst(documentID);
-    if(documentData == null) {
-      // cache miss
-      documentData = documentCollection.getDocumentData(documentID);
-      documentCache.putAndMoveToFirst(documentID, documentData);
-      if(documentCache.size() > DOCUMENT_CACHE_SIZE) {
-        documentCache.removeLast();
-      }
-    }
-    return documentData;
-  }
 
+
   /**
    * Closes this {@link QueryEngine} and releases all resources.
    */
@@ -842,8 +754,7 @@
         logger.error("Exception while closing query runner.", e);
       }
     }
-    // close the document collection
-    documentCollection.close();
+
     // close all the semantic indexers
     logger.info("Closing Semantic Annotation Helpers.");
     if(indexConfig.getSemanticIndexers() != null) {
@@ -856,16 +767,6 @@
         }
       }
     }
-    // write the deleted documents set
-    synchronized(writeDeletedDocsTimer) {
-      if(writeDeletedDocsTask != null) {
-        writeDeletedDocsTask.cancel();
-      }
-      writeDeletedDocsTimer.cancel();
-      // explicitly call it one last time
-      new WriteDeletedDocsTask().run();
-    }
-    documentCache.clear();
     for(IndexReaderPool aPool : indexReaderPools) {
       try {
         aPool.close();
@@ -904,7 +805,8 @@
         indexConfig.getSemanticIndexers().length];
     
     try {
-      File mg4JIndexDir = new File(indexDir, Indexer.MG4J_INDEX_DIRNAME);
+//      File mg4JIndexDir = new File(indexDir, Indexer.MG4J_INDEX_DIRNAME);
+      File mg4JIndexDir = null;
       // Load the token indexes
       for(int i = 0; i < indexConfig.getTokenIndexers().length; i++) {
         File indexBasename =
@@ -938,10 +840,8 @@
               openOneSubIndex(indexBasename.toURI());
         }
       }
-      // open the zipped document collection
-      documentCollection = new DocumentCollection(indexDir);
-      // prepare the document cache
-      documentCache = new Long2ObjectLinkedOpenHashMap<DocumentData>();
+
+
     } catch(IOException e) {
       // IOException gets thrown upward
       throw e;
@@ -995,106 +895,5 @@
         MG4JTools.upgradeIndex(indexUri);
       }
   
-  /**
-   * Marks a given document (identified by its ID) as deleted. Deleted 
documents
-   * are never returned as search results.
-   * @param documentId
-   */
-  public void deleteDocument(long documentId) {
-    if(deletedDocumentIds.add(documentId)) {
-      writeDeletedDocsLater();
-    }
-  }
 
-  /**
-   * Marks the given batch of documents (identified by ID) as deleted. Deleted
-   * documents are never returned as search results.
-   * @param documentIds
-   */
-  public void deleteDocuments(Collection<? extends Number> documentIds) {
-    List<Long> idsToDelete = new ArrayList<Long>(documentIds.size());
-    for(Number n : documentIds) {
-      idsToDelete.add(Long.valueOf(n.longValue()));
-    }
-    if(deletedDocumentIds.addAll(idsToDelete)) {
-      writeDeletedDocsLater();
-    }
-  }
-  
-  /**
-   * Checks whether a given document (specified by its ID) is marked as 
deleted. 
-   * @param documentId
-   * @return
-   */
-  public boolean isDeleted(long documentId) {
-    return deletedDocumentIds.contains(documentId);
-  }
-  
-  /**
-   * Mark the given document (identified by ID) as <i>not</i> deleted.  Calling
-   * this method for a document ID that is not currently marked as deleted has
-   * no effect.
-   */
-  public void undeleteDocument(long documentId) {
-    if(deletedDocumentIds.remove(documentId)) {
-      writeDeletedDocsLater();
-    }
-  }
-  
-  /**
-   * Mark the given documents (identified by ID) as <i>not</i> deleted.  
Calling
-   * this method for a document ID that is not currently marked as deleted has
-   * no effect.
-   */
-  public void undeleteDocuments(Collection<? extends Number> documentIds) {
-    List<Long> idsToUndelete = new ArrayList<Long>(documentIds.size());
-    for(Number n : documentIds) {
-      idsToUndelete.add(Long.valueOf(n.longValue()));
-    }
-    if(deletedDocumentIds.removeAll(idsToUndelete)) {
-      writeDeletedDocsLater();
-    }
-  }
-  
-  /**
-   * Writes the set of deleted document to disk in a background thread, after a
-   * short delay. If a previous request has not started yet, this new request 
-   * will replace it. 
-   */
-  protected void writeDeletedDocsLater() {
-    synchronized(writeDeletedDocsTimer) {
-      if(writeDeletedDocsTask != null) {
-        writeDeletedDocsTask.cancel();
-      }
-      writeDeletedDocsTask = new WriteDeletedDocsTask();
-      writeDeletedDocsTimer.schedule(writeDeletedDocsTask, 1000);
-    }
-  }
-  
-  /**
-   * Reads the list of deleted documents from disk. 
-   */
-  @SuppressWarnings("unchecked")
-  protected synchronized void readDeletedDocs() throws IOException{
-    deletedDocumentIds = Collections.synchronizedSortedSet(
-            new TreeSet<Long>());
-    File delFile = new File(indexDir, DELETED_DOCUMENT_IDS_FILE_NAME);
-    if(delFile.exists()) {
-      try {
-        ObjectInputStream ois = new ObjectInputStream(
-                new GZIPInputStream(
-                new BufferedInputStream(
-                new FileInputStream(delFile))));
-        // an old index will have saved a Set<Integer>, a new one will be
-        // Set<Long>
-        Set<? extends Number> savedSet = (Set<? extends 
Number>)ois.readObject();
-        for(Number n : savedSet) {
-          deletedDocumentIds.add(Long.valueOf(n.longValue()));
-        }
-      } catch(ClassNotFoundException e) {
-        // this should never happen
-        throw new RuntimeException(e);
-      }
-    }
-  }
 }

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
CenturyLink Cloud: The Leader in Enterprise Cloud Services.
Learn Why More Businesses Are Choosing CenturyLink Cloud For
Critical Workloads, Development Environments & Everything In Between.
Get a Quote or Start a Free Trial Today. 
http://pubads.g.doubleclick.net/gampad/clk?id=119420431&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to