Revision: 17226
http://sourceforge.net/p/gate/code/17226
Author: valyt
Date: 2014-01-14 17:25:33 +0000 (Tue, 14 Jan 2014)
Log Message:
-----------
Added a light dusting of synchronization to make sure all sub-indexes break the
document stream into batches at the same location. This is needed to support
live-searchable indexes, as the maximum document pointer must be the same in
all sub-indexes.
Moved the zip collection functionality to MimirIndex.
Moved the document deletion functionality to MimirIndex.
Started work on supporting search for M?\195?\173mir 5.0 indexes.
Modified Paths:
--------------
mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java
mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-01-14 17:25:33 UTC (rev 17226)
@@ -24,15 +24,38 @@
import gate.mimir.index.IndexException;
import gate.mimir.index.mg4j.GATEDocument;
import gate.mimir.index.mg4j.MimirIndexBuilder;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollection;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollectionWriter;
+import gate.mimir.index.mg4j.zipcollection.DocumentData;
import gate.mimir.search.QueryEngine;
import gate.util.GateRuntimeException;
+import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import org.apache.log4j.Logger;
@@ -58,16 +81,65 @@
*/
public static final String INDEX_CONFIG_FILENAME = "config.xml";
+ /**
+ * The name for the file (stored in the root index directory) containing
+ * the serialised version of the {@link #deletedDocumentIds}.
+ */
+ public static final String DELETED_DOCUMENT_IDS_FILE_NAME = "deleted.ser";
+
+ /**
+ * How many occurrences to index in each batch. This metric is more
reliable,
+ * than document counts, as it does not depend on average document size.
+ */
+ public static final int DEFAULT_OCCURRENCES_PER_BATCH = 100 * 1000 * 1000;
+
+ /**
+ * The default length for the buffer input / output queues for sub-indexers.
+ */
+ public static final int DEFAULT_INDEXING_QUEUE_SIZE = 30;
+
+ /**
+ * The maximum number of documents to be stored in the document cache.
+ */
+ protected static final int DOCUMENT_DATA_CACHE_SIZE = 100;
+
+ /**
+ * How many occurrences to be accumulated in RAM before a new tail batch is
+ * written to disk.
+ */
+ protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
+
private static final Logger logger = Logger.getLogger(MimirIndex.class);
/**
- * A {@link Runnable} that collects the documents from the sub-indexers and
- * deletes them from GATE.
+ * A {@link Runnable} that performs various index maintenance in a separate
+ * thread. The principal role is to collect the documents from the
+ * sub-indexers and delete them from GATE.
*/
- protected class DocumentCollector implements Runnable{
+ protected class IndexMaintenanceRunner implements Runnable{
public void run(){
boolean finished = false;
while(!finished){
+ // if any batches have finished writing, then decrease the
+ // occurrencesInRam value
+ Iterator<Future<Long>> futureIter = batchesBeingDumped.iterator();
+ while(futureIter.hasNext()) {
+ Future<Long> aFuture = futureIter.next();
+ try {
+ if(aFuture.isDone()) {
+ occurrencesInRam -= aFuture.get();
+ futureIter.remove();
+ }
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch(ExecutionException e) {
+ // we never set an exception on these futures, so this should never
+ // happen
+ logger.error(
+ "Got unexpected exception from future doing batch dump", e);
+ }
+ }
+
GATEDocument currentDocument = null;
try {
//get one document from each of the sub-indexers
@@ -103,6 +175,31 @@
}
}
+
+ private class WriteDeletedDocsTask extends TimerTask {
+ public void run() {
+ synchronized(writeDeletedDocsTimer) {
+ File delFile = new File(indexDirectory,
DELETED_DOCUMENT_IDS_FILE_NAME);
+ if(delFile.exists()) {
+ delFile.delete();
+ }
+ try{
+ logger.debug("Writing deleted documents set");
+ ObjectOutputStream oos = new ObjectOutputStream(
+ new GZIPOutputStream(
+ new BufferedOutputStream(
+ new FileOutputStream(delFile))));
+ oos.writeObject(deletedDocumentIds);
+ oos.flush();
+ oos.close();
+ logger.debug("Writing deleted documents set completed.");
+ }catch (IOException e) {
+ logger.error("Exception while writing deleted documents set", e);
+ }
+ }
+ }
+ }
+
/**
* The {@link IndexConfig} used for this index.
*/
@@ -114,14 +211,56 @@
protected File indexDirectory;
/**
+ * A zip collection builder used to build a zip of the collection
+ * if this has been requested.
+ */
+ protected DocumentCollectionWriter collectionWriter = null;
+
+ /**
+ * The zipped document collection from MG4J (built during the indexing of the
+ * first token feature). This can be used to obtain the document text and to
+ * display the content of the hits.
+ */
+ protected DocumentCollection documentCollection;
+
+ /**
+ * A cache of {@link DocumentData} values used for returning the various
+ * document details (title, URI, text).
+ */
+ protected Long2ObjectLinkedOpenHashMap<DocumentData> documentCache;
+
+ /**
* The thread used to clean-up GATE documents after they have been indexed.
*/
protected Thread documentsCollectorThread;
+
+ /**
+ * A (synchronized) collection of futures that work on saving batches.
+ */
+ protected List<Future<Long>> batchesBeingDumped;
+
protected volatile boolean closed = false;
+ /**
+ * The set of IDs for the documents marked as deleted.
+ */
+ private transient SortedSet<Long> deletedDocumentIds;
/**
+ * A timer used to execute the writing of deleted documents data to disk.
+ * This timer is used to create a delay, allowing a batch of writes to be
+ * coalesced into a single one.
+ */
+ private transient Timer writeDeletedDocsTimer;
+
+ /**
+ * The timer task used to top write to disk the deleted documents data.
+ * This value is non-null only when there is a pending write.
+ */
+ private volatile transient WriteDeletedDocsTask writeDeletedDocsTask;
+
+ /**
* The token indexes, in the order they are listed in the {@link
#indexConfig}.
*/
protected AtomicTokenIndex[] tokenIndexes;
@@ -137,12 +276,21 @@
*/
protected AtomicIndex[] subIndexes;
+ protected int indexingQueueSize = DEFAULT_INDEXING_QUEUE_SIZE;
/**
+ * The total number of occurrences in all sub-indexes that have not yet been
+ * written to disk.
+ */
+ protected long occurrencesInRam;
+
+ /**
* Create a new Index.
* @param indexConfig the configuration for the index.
+ * @throws IOException
+ * @throws IndexException
*/
- public MimirIndex(IndexConfig indexConfig) {
+ public MimirIndex(IndexConfig indexConfig) throws IOException,
IndexException {
this.indexConfig = indexConfig;
this.indexDirectory = this.indexConfig.getIndexDirectory();
@@ -175,8 +323,13 @@
* Opens the index files, if any, prepares all the sub-indexers specified in
* the index config, and gets this index ready to start indexing documents
and
* answer queries.
+ * @throws IOException
+ * @throws IndexException
*/
- protected void openIndex() {
+ protected void openIndex() throws IOException, IndexException {
+ // ####################
+ // Prepare for indexing
+ // ####################
// read the index config and create the sub-indexers
TokenIndexerConfig tokConfs[] = indexConfig.getTokenIndexers();
tokenIndexes = new AtomicTokenIndex[tokConfs.length];
@@ -187,8 +340,8 @@
subIndexname,
new File(indexDirectory, subIndexname),
tokConfs[i].isDirectIndexEnabled(),
- new LinkedBlockingQueue<GATEDocument>(),
- new LinkedBlockingQueue<GATEDocument>(),
+ new LinkedBlockingQueue<GATEDocument>(indexingQueueSize),
+ new LinkedBlockingQueue<GATEDocument>(indexingQueueSize),
tokConfs[i],
i == 0);
}
@@ -213,9 +366,32 @@
System.arraycopy(mentionIndexes, 0, subIndexes, tokenIndexes.length,
mentionIndexes.length);
- // start the collector thread
- documentsCollectorThread = new Thread(new DocumentCollector());
+ occurrencesInRam = 0;
+
+ batchesBeingDumped = Collections.synchronizedList(new
LinkedList<Future<Long>>());
+
+ // start the documents collector thread
+ documentsCollectorThread = new Thread(new IndexMaintenanceRunner());
documentsCollectorThread.start();
+
+ // prepare the zip collection writer
+ logger.info("Creating zipped collection for field \"" +
+ tokConfs[0].getFeatureName() + "\"");
+ collectionWriter = new DocumentCollectionWriter(indexDirectory);
+
+ // #####################
+ // Prepare for searching
+ // #####################
+ readDeletedDocs();
+ // start the timer that regularly writes the deleted documents list
+ writeDeletedDocsTimer = new Timer("Delete documents writer");
+
+ // open the zipped document collection
+ documentCollection = new DocumentCollection(indexDirectory);
+
+ // prepare the document cache
+ documentCache = new Long2ObjectLinkedOpenHashMap<DocumentData>();
+
}
/**
@@ -228,13 +404,43 @@
public void indexDocument(Document document) throws InterruptedException {
if(closed) throw new IllegalStateException("This index has been closed, "
+ "no further documents can be indexed.");
+
+ // check if we need to write a new batch:
+ // we have too many occurrences and
+ // there are no outstanding batch writing operations
+ if(occurrencesInRam > occurrencesPerBatch && batchesBeingDumped.isEmpty())
{
+ for(AtomicIndex aSubIndex: subIndexes){
+ batchesBeingDumped.add(aSubIndex.requestDumpToDisk());
+ }
+ }
GATEDocument gDocument = new GATEDocument(document, indexConfig);
for(AtomicIndex aSubIndex: subIndexes){
aSubIndex.getInputQueue().put(gDocument);
}
+
}
+
+ /**
+ * Notifies this index that more occurrences have been stored in RAM by one
of
+ * its sub-indexes.
+ *
+ * @param occurrences
+ */
+ synchronized public void addOccurrences(long occurrences) {
+ occurrencesInRam += occurrences;
+ }
/**
+ * Called by the first token indexer when a new document has been indexed
+ * to ask the main index to save the necessary zip collection data
+ * @param gDocument
+ * @throws IndexException
+ */
+ public void writeZipDocumentData(DocumentData docData) throws IndexException
{
+ collectionWriter.writeDocument(docData);
+ }
+
+ /**
* Stops this index from accepting any further document for indexing, stops
* this index from accepting any more queries, finishes indexing all the
* currently queued documents, writes all the files to disk, and returns.
@@ -247,6 +453,20 @@
aSubIndex.getInputQueue().put(GATEDocument.END_OF_QUEUE);
}
}
+ // write the deleted documents set
+ synchronized(writeDeletedDocsTimer) {
+ if(writeDeletedDocsTask != null) {
+ writeDeletedDocsTask.cancel();
+ }
+ writeDeletedDocsTimer.cancel();
+ // explicitly call it one last time
+ new WriteDeletedDocsTask().run();
+ }
+ // close the document collection
+ documentCollection.close();
+
+ documentCache.clear();
+
// wait for indexing to end
documentsCollectorThread.join();
}
@@ -256,11 +476,160 @@
}
public QueryEngine getQueryEngine() {
+
return null;
}
public File getIndexDirectory() {
return indexDirectory;
}
+
+ public long getOccurrencesPerBatch() {
+ return occurrencesPerBatch;
+ }
+
+ public int getIndexingQueueSize() {
+ return indexingQueueSize;
+ }
+
+ public void setIndexingQueueSize(int indexingQueueSize) {
+ this.indexingQueueSize = indexingQueueSize;
+ }
+
+ public void setOccurrencesPerBatch(long occurrencesPerBatch) {
+ this.occurrencesPerBatch = occurrencesPerBatch;
+ }
+ public DocumentCollection getDocumentCollection() {
+ return documentCollection;
+ }
+
+ /**
+ * Gets the {@link DocumentData} for a given document ID, from the on disk
+ * document collection. In memory caching is performed to reduce the cost of
+ * this call.
+ * @param documentID
+ * the ID of the document to be obtained.
+ * @return the {@link DocumentData} associated with the given document ID.
+ * @throws IndexException
+ */
+ public synchronized DocumentData getDocumentData(long documentID)
+ throws IndexException {
+ if(isDeleted(documentID)) {
+ throw new IndexException("Invalid document ID " + documentID);
+ }
+ DocumentData documentData = documentCache.getAndMoveToFirst(documentID);
+ if(documentData == null) {
+ // cache miss
+ documentData = documentCollection.getDocumentData(documentID);
+ documentCache.putAndMoveToFirst(documentID, documentData);
+ if(documentCache.size() > DOCUMENT_DATA_CACHE_SIZE) {
+ documentCache.removeLast();
+ }
+ }
+ return documentData;
+ }
+
+ /**
+ * Marks a given document (identified by its ID) as deleted. Deleted
documents
+ * are never returned as search results.
+ * @param documentId
+ */
+ public void deleteDocument(long documentId) {
+ if(deletedDocumentIds.add(documentId)) {
+ writeDeletedDocsLater();
+ }
+ }
+
+ /**
+ * Marks the given batch of documents (identified by ID) as deleted. Deleted
+ * documents are never returned as search results.
+ * @param documentIds
+ */
+ public void deleteDocuments(Collection<? extends Number> documentIds) {
+ List<Long> idsToDelete = new ArrayList<Long>(documentIds.size());
+ for(Number n : documentIds) {
+ idsToDelete.add(Long.valueOf(n.longValue()));
+ }
+ if(deletedDocumentIds.addAll(idsToDelete)) {
+ writeDeletedDocsLater();
+ }
+ }
+
+ /**
+ * Checks whether a given document (specified by its ID) is marked as
deleted.
+ * @param documentId
+ * @return
+ */
+ public boolean isDeleted(long documentId) {
+ return deletedDocumentIds.contains(documentId);
+ }
+
+ /**
+ * Mark the given document (identified by ID) as <i>not</i> deleted. Calling
+ * this method for a document ID that is not currently marked as deleted has
+ * no effect.
+ */
+ public void undeleteDocument(long documentId) {
+ if(deletedDocumentIds.remove(documentId)) {
+ writeDeletedDocsLater();
+ }
+ }
+
+ /**
+ * Mark the given documents (identified by ID) as <i>not</i> deleted.
Calling
+ * this method for a document ID that is not currently marked as deleted has
+ * no effect.
+ */
+ public void undeleteDocuments(Collection<? extends Number> documentIds) {
+ List<Long> idsToUndelete = new ArrayList<Long>(documentIds.size());
+ for(Number n : documentIds) {
+ idsToUndelete.add(Long.valueOf(n.longValue()));
+ }
+ if(deletedDocumentIds.removeAll(idsToUndelete)) {
+ writeDeletedDocsLater();
+ }
+ }
+
+ /**
+ * Writes the set of deleted document to disk in a background thread, after a
+ * short delay. If a previous request has not started yet, this new request
+ * will replace it.
+ */
+ protected void writeDeletedDocsLater() {
+ synchronized(writeDeletedDocsTimer) {
+ if(writeDeletedDocsTask != null) {
+ writeDeletedDocsTask.cancel();
+ }
+ writeDeletedDocsTask = new WriteDeletedDocsTask();
+ writeDeletedDocsTimer.schedule(writeDeletedDocsTask, 1000);
+ }
+ }
+
+ /**
+ * Reads the list of deleted documents from disk.
+ */
+ @SuppressWarnings("unchecked")
+ protected synchronized void readDeletedDocs() throws IOException{
+ deletedDocumentIds = Collections.synchronizedSortedSet(
+ new TreeSet<Long>());
+ File delFile = new File(indexDirectory, DELETED_DOCUMENT_IDS_FILE_NAME);
+ if(delFile.exists()) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(
+ new GZIPInputStream(
+ new BufferedInputStream(
+ new FileInputStream(delFile))));
+ // an old index will have saved a Set<Integer>, a new one will be
+ // Set<Long>
+ Set<? extends Number> savedSet = (Set<? extends
Number>)ois.readObject();
+ for(Number n : savedSet) {
+ deletedDocumentIds.add(Long.valueOf(n.longValue()));
+ }
+ } catch(ClassNotFoundException e) {
+ // this should never happen
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2014-01-14 17:25:33 UTC (rev 17226)
@@ -75,6 +75,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -94,6 +97,50 @@
public abstract class AtomicIndex implements Runnable {
/**
+ * A custom implementation of Future, allowing us to execute the work in the
+ * indexing thread, and report its result.
+ *
+ * @param <T>
+ */
+ protected static class CustomFuture<T> extends FutureTask<T> {
+
+ public CustomFuture(Callable<T> operation) {
+ super(operation);
+ }
+
+ @Override
+ protected void done() {
+ super.done();
+ }
+
+ @Override
+ protected void set(T v) {
+ super.set(v);
+ }
+
+ @Override
+ protected void setException(Throwable t) {
+ super.setException(t);
+ }
+
+
+ }
+
+ private static final Callable<Long> noOpLong = new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ return null;
+ }
+ };
+
+ private static final Callable<Void> noOpVoid = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ return null;
+ }
+ };
+
+ /**
* An in-RAM representation of a postings list
*/
protected static class PostingsList {
@@ -404,15 +451,21 @@
*/
public static final String DOCUMENTS_QUEUE_FILE_NAME = "queued-documents";
+ /** The initial size of the term map. */
+ private static final int INITIAL_TERM_MAP_SIZE = 1024;
+
/**
- * How many occurrences to index in each batch. This metric is more
reliable,
- * than document counts, as it does not depend on average document size.
+ * A marker value that gets queued to indicate a request to
+ * write the in-RAM data to a new index batch.
*/
- public static final int DEFAULT_OCCURRENCES_PER_BATCH = 20 * 1000 * 1000;
+ private static final GATEDocument DUMP_BATCH = new GATEDocument(){};
+
+ /**
+ * A marker value that gets queued to indicate a request to combine all the
+ * on-disk batches into a new head.
+ */
+ private static final GATEDocument COMPRESS_INDEX = new GATEDocument(){};
- /** The initial size of the term map. */
- private static final int INITIAL_TERM_MAP_SIZE = 1024;
-
private static Logger logger = Logger.getLogger(AtomicIndex.class);
protected static final PatternFilenameFilter TAILS_FILENAME_FILTER =
@@ -441,11 +494,7 @@
*/
protected long occurrencesInRAM = 0;
- /**
- * How many occurrences to be accumulated in RAM before a new tail batch is
- * written to disk.
- */
- protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
+
/**
* The {@link MimirIndex} that this atomic index is a member of.
@@ -519,19 +568,19 @@
/**
* If a request was made to compress the index (combine all sub-indexes
- * into a new head) this flag will be set to true. The operation will be
+ * into a new head) this value will be non-null. The operation will be
* performed on the indexing thread at the first opportunity. At that point
- * the flag will be reset to false.
+ * this future will complete, and the value will be set back to null.
*/
- protected volatile boolean indexCompressionRequested = false;
+ protected CustomFuture<Void> indexCompressionRequested;
/**
- * If a request was made to write the in-RAM index data to disk this flag
- * will be set to true. The operation will be performed on the indexing
- * thread at the first opportunity. At that point the flag will be reset to
- * false.
+ * If a request was made to write the in-RAM index data to disk this value
+ * will be not null. The operation will be performed on the indexing
+ * thread at the first opportunity. At that point the Future will complete,
+ * and the value will be set back to null.
*/
- protected volatile boolean tailWriteRequested = false;
+ protected CustomFuture<Long> tailWriteRequested;
/**
* Creates a new AtomicIndex
@@ -770,12 +819,11 @@
}
if(hasDirectIndex) {
+ //TODO
// dump new direct tail (invert the tail just written)
// merge new direct tail into direct index cluster
}
- // clear queued-documents folder
-
// clear out internal state, in preparation for the next tail
newBatch();
}
@@ -874,19 +922,37 @@
/**
* Instructs this index to dump to disk all the in-RAM index data at
the fist
* opportunity.
+ * @return a future value that, upon completion, will return the number
of
+ * occurrences written to disk.
*/
- public void requestDumpToDisk() {
- tailWriteRequested = true;
+ public synchronized Future<Long> requestDumpToDisk() {
+ if(tailWriteRequested == null) {
+ tailWriteRequested = new CustomFuture<Long>(noOpLong);
+ }
+ try {
+ inputQueue.put(DUMP_BATCH);
+ } catch(InterruptedException e) {
+ tailWriteRequested.setException(e);
+ }
+ return tailWriteRequested;
}
- public void requestIndexCompression() {
- indexCompressionRequested = true;
+ public synchronized Future<Void> requestIndexCompression() {
+ if(indexCompressionRequested == null) {
+ indexCompressionRequested = new CustomFuture<Void>(noOpVoid);
+ }
+ try {
+ inputQueue.put(COMPRESS_INDEX);
+ } catch(InterruptedException e) {
+ indexCompressionRequested.setException(e);
+ }
+ return indexCompressionRequested;
}
/**
* Opens one sub-index, specified as a directory inside this Atom
Index's
* index directory.
- * @param indexDir
+ * @param subIndexDirname
* @return
* @throws IOException
* @throws IndexException
@@ -944,27 +1010,43 @@
do{
aDocument = inputQueue.take();
if(aDocument != GATEDocument.END_OF_QUEUE) {
- try {
- processDocument(aDocument);
- } catch(Throwable e) {
- logger.error("Problem while indexing document!", e);
- }
- //dump batch if needed AND there is data to dump
- if (occurrencesPerBatch > 0 && occurrencesInRAM >
occurrencesPerBatch){
+ if(aDocument == DUMP_BATCH) {
+ //dump batch was requested
+ long occurencestToWrite = occurrencesInRAM;
writeCurrentTail();
- }
- if(indexCompressionRequested) {
+ synchronized(this) {
+ if(tailWriteRequested != null) {
+ tailWriteRequested.set(occurencestToWrite);
+ tailWriteRequested.done();
+ tailWriteRequested = null;
+ }
+ }
+ } else if(aDocument == COMPRESS_INDEX) {
+ // compress index was requested
compressIndex();
+ synchronized(this) {
+ if(indexCompressionRequested != null) {
+ indexCompressionRequested.done();
+ indexCompressionRequested = null;
+ }
+ }
+ } else {
+ try {
+ long occurencesBefore = occurrencesInRAM;
+ processDocument(aDocument);
+ parent.addOccurrences(occurrencesInRAM - occurencesBefore);
+ } catch(Throwable e) {
+ logger.error("Problem while indexing document!", e);
+ }
}
- if(tailWriteRequested) {
- writeCurrentTail();
- }
} else {
// close down
writeCurrentTail();
flush();
}
- outputQueue.put(aDocument);
+ if(aDocument != DUMP_BATCH && aDocument != COMPRESS_INDEX) {
+ outputQueue.put(aDocument);
+ }
} while(aDocument != GATEDocument.END_OF_QUEUE);
}
}catch(InterruptedException e) {
@@ -980,9 +1062,7 @@
* Closes all file-based resources.
* @throws IOException
*/
- protected void flush() throws IOException {
-
- };
+ abstract protected void flush() throws IOException;
/**
* Notifies this index to stop its indexing operations, and waits for
all data
@@ -1047,9 +1127,15 @@
protected abstract String[] calculateTermStringForAnnotation(Annotation ann,
GATEDocument gateDocument) throws IndexException;
+ /**
+ * Adds the supplied document to the in-RAM index.
+ * @param gateDocument the document to index
+ * @throws IndexException
+ */
protected void processDocument(GATEDocument gateDocument) throws
IndexException{
//zero document related counters
tokenPosition = 0;
+
documentStarting(gateDocument);
//get the annotations to be processed
Annotation[] annotsToProcess = getAnnotsToProcess(gateDocument);
@@ -1079,7 +1165,6 @@
int docLength = tokenPosition + 1;
if(docLength > maxDocSizeInRAM) maxDocSizeInRAM = docLength;
documentSizesInRAM.add(docLength);
-
} catch (IOException e) {
throw new IndexException("IO Exception while indexing", e);
}finally {
@@ -1113,14 +1198,7 @@
}
}
- public long getOccurrencesPerBatch() {
- return occurrencesPerBatch;
- }
- public void setOccurrencesPerBatch(long occurrencesPerBatch) {
- this.occurrencesPerBatch = occurrencesPerBatch;
- }
-
public File getIndexDirectory() {
return indexDirectory;
}
Modified:
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
2014-01-14 17:25:33 UTC (rev 17226)
@@ -21,6 +21,7 @@
import gate.mimir.IndexConfig.TokenIndexerConfig;
import gate.mimir.index.mg4j.GATEDocument;
import gate.mimir.index.mg4j.GATEDocumentFactory;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollection;
import gate.mimir.index.mg4j.zipcollection.DocumentCollectionWriter;
import gate.mimir.index.mg4j.zipcollection.DocumentData;
import it.unimi.di.big.mg4j.index.Index;
@@ -51,27 +52,11 @@
private static final String[] DO_NOT_INDEX = new String[]{};
/**
- * A zip collection builder used to build a zip of the collection
- * if this has been requested.
+ * Is this token index responsible for writing the zip collection?
*/
- protected DocumentCollectionWriter collectionWriter = null;
+ protected boolean zipCollectionEnabled = false;
/**
- * An array of helpers for creating document metadata.
- */
- protected DocumentMetadataHelper[] docMetadataHelpers;
-
- /**
- * Stores the document URI for writing to the zip collection;
- */
- protected String documentURI;
-
- /**
- * Stores the document title for writing to the zip collection.
- */
- protected String documentTitle;
-
- /**
* Stores the document tokens for writing to the zip collection;
*/
protected List<String> documentTokens;
@@ -81,6 +66,10 @@
*/
protected List<String> documentNonTokens;
+ /**
+ * An array of helpers for creating document metadata.
+ */
+ protected DocumentMetadataHelper[] docMetadataHelpers;
/**
* GATE document factory used by the zip builder, and also to
@@ -111,11 +100,13 @@
super(parent, name, indexDirectory, hasDirectIndex, inputQueue,
outputQueue);
this.featureName = config.getFeatureName();
this.termProcessor = config.getTermProcessor();
+ this.zipCollectionEnabled = zipCollection;
+ if(zipCollectionEnabled) {
+ documentTokens = new LinkedList<String>();
+ documentNonTokens = new LinkedList<String>();
+ docMetadataHelpers = parent.getIndexConfig().getDocMetadataHelpers();
+ }
- if(zipCollection) {
- logger.info("Creating zipped collection for field \"" + name + "\"");
- collectionWriter = new DocumentCollectionWriter(indexDirectory);
- }
// save the term processor
additionalProperties.setProperty(Index.PropertyKeys.TERMPROCESSOR,
ObjectParser.toSpec(termProcessor));
@@ -130,11 +121,8 @@
* is about to start.
*/
protected void documentStarting(GATEDocument gateDocument) throws
IndexException {
- if(collectionWriter != null) {
- documentURI = gateDocument.uri().toString();
- documentTitle = gateDocument.title().toString();
- documentTokens = new LinkedList<String>();
- documentNonTokens = new LinkedList<String>();
+ if(zipCollectionEnabled) {
+ // notify the metadata helpers
if(docMetadataHelpers != null){
for(DocumentMetadataHelper aHelper : docMetadataHelpers){
aHelper.documentStart(gateDocument);
@@ -151,19 +139,20 @@
* the current document.
*/
protected void documentEnding(GATEDocument gateDocument) throws
IndexException {
- if(collectionWriter != null) {
- DocumentData docData = new DocumentData(documentURI,
- documentTitle,
- documentTokens.toArray(new String[documentTokens.size()]),
- documentNonTokens.toArray(new
String[documentNonTokens.size()]));
+ if(zipCollectionEnabled) {
+ DocumentData docData = new DocumentData(
+ gateDocument.uri().toString(),
+ gateDocument.title().toString(),
+ documentTokens.toArray(new String[documentTokens.size()]),
+ documentNonTokens.toArray(new String[documentNonTokens.size()]));
if(docMetadataHelpers != null){
for(DocumentMetadataHelper aHelper : docMetadataHelpers){
aHelper.documentEnd(gateDocument, docData);
}
}
- collectionWriter.writeDocument(docData);
- documentTokens = null;
- documentNonTokens = null;
+ parent.writeZipDocumentData(docData);
+ documentTokens.clear();
+ documentNonTokens.clear();
}
}
@@ -204,7 +193,7 @@
String value = (String)tokenFeatures.get(featureName);
currentTerm.replace(value == null ? "" : value);
//save the *unprocessed* term to the collection, if required.
- if(collectionWriter != null) {
+ if(zipCollectionEnabled) {
documentTokens.add(currentTerm.toString());
documentNonTokens.add(gateDocument.getNonTokens()[tokenPosition]);
}
@@ -223,9 +212,5 @@
*/
@Override
protected void flush() throws IOException {
- if(collectionWriter != null) {
- logger.info("Saving zipped collection");
- collectionWriter.close();
- }
- }
+ }
}
Modified:
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/GATEDocument.java
2014-01-14 17:25:33 UTC (rev 17226)
@@ -150,7 +150,7 @@
/**
* Private constructor used to create the {@link #END_OF_QUEUE} instance.
*/
- private GATEDocument(){
+ protected GATEDocument(){
}
public GATEDocument(gate.Document gateDocument,
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java
2014-01-10 16:36:03 UTC (rev 17225)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/search/QueryEngine.java
2014-01-14 17:25:33 UTC (rev 17226)
@@ -19,6 +19,7 @@
import gate.mimir.DocumentRenderer;
import gate.mimir.IndexConfig;
import gate.mimir.IndexConfig.SemanticIndexerConfig;
+import gate.mimir.MimirIndex;
import gate.mimir.SemanticAnnotationHelper;
import gate.mimir.index.IndexException;
import gate.mimir.index.Indexer;
@@ -76,31 +77,7 @@
*/
public class QueryEngine {
- private class WriteDeletedDocsTask extends TimerTask {
- public void run() {
- synchronized(writeDeletedDocsTimer) {
- File delFile = new File(indexDir, DELETED_DOCUMENT_IDS_FILE_NAME);
- if(delFile.exists()) {
- delFile.delete();
- }
- try{
- logger.debug("Writing deleted documents set");
- ObjectOutputStream oos = new ObjectOutputStream(
- new GZIPOutputStream(
- new BufferedOutputStream(
- new FileOutputStream(delFile))));
- oos.writeObject(deletedDocumentIds);
- oos.flush();
- oos.close();
- logger.debug("Writing deleted documents set completed.");
- }catch (IOException e) {
- logger.error("Exception while writing deleted documents set", e);
- }
- }
- }
- }
-
/**
* Represents the type of index that should be searched. Mimir uses two types
* of indexes: token indexes (which index the text input) and annotation
@@ -134,55 +111,12 @@
protected IndexReaderPool[] directIndexReaderPools;
/**
- * The zipped document collection from MG4J (built during the indexing of the
- * first token feature). This can be used to obtain the document text and to
- * display the content of the hits.
- */
- protected DocumentCollection documentCollection;
-
- /**
- * A cache of {@link DocumentData} values used for returning the various
- * document details (title, URI, text).
- */
- protected Long2ObjectLinkedOpenHashMap<DocumentData> documentCache;
-
- /**
- * The maximum number of documents to be stored in the document cache.
- */
- protected static final int DOCUMENT_CACHE_SIZE = 100;
-
- /**
- * The set of IDs for the documents marked as deleted.
- */
- private transient SortedSet<Long> deletedDocumentIds;
-
- /**
- * A timer used to execute the writing of deleted documents data to disk.
- * This timer is used to create a delay, allowing a batch of writes to be
- * coalesced into a single one.
- */
- private transient Timer writeDeletedDocsTimer;
-
- /**
- * The timer task used to top write to disk the deleted documents data.
- * This value is non-null only when there is a pending write.
- */
- private volatile transient WriteDeletedDocsTask writeDeletedDocsTask;
-
- /**
* The document sizes used during search time (if running in document mode)
to
* simulate document-spanning annotations.
*/
private transient IntBigList documentSizes;
/**
- * The name for the file (stored in the root index directory) containing
- * the serialised version of the {@link #deletedDocumentIds}.
- */
- public static final String DELETED_DOCUMENT_IDS_FILE_NAME = "deleted.ser";
-
-
- /**
* The maximum size of an index that can be loaded in memory (by default 64
* MB).
*/
@@ -194,11 +128,10 @@
*/
public static final int DEFAULT_DOCUMENT_BLOCK_SIZE = 1000;
-
/**
- * The top level directory of the Mimir index.
+ * The index being searched.
*/
- protected File indexDir;
+ protected MimirIndex index;
/**
* The index configuration this index was built from.
@@ -242,13 +175,6 @@
* files).
*/
private List<QueryRunner> activeQueryRunners;
-
- /**
- * @return the indexDir
- */
- public File getIndexDir() {
- return indexDir;
- }
/**
* Are sub-bindings used in this query engine. Sub-bindings are used to
@@ -513,7 +439,18 @@
return indexConfig;
}
+
+
/**
+ * Constructs a new query engine for a {@link MimirIndex}.
+ * @param index the index to be searched.
+ */
+ public QueryEngine(MimirIndex index) {
+ this.index = index;
+
+ }
+
+ /**
* Constructs a new {@link QueryEngine} for a specified Mimir index. The
mimir
* semantic repository will be initialized using the default location in the
* filesystem, provided by the IndexConfig
@@ -524,7 +461,6 @@
* if there are problems while opening the indexes.
*/
public QueryEngine(File indexDir) throws gate.mimir.index.IndexException {
- this.indexDir = indexDir;
// read the index config
try {
indexConfig =
@@ -545,8 +481,8 @@
}
}
}
- readDeletedDocs();
+
activeQueryRunners = Collections.synchronizedList(
new ArrayList<QueryRunner>());
} catch(FileNotFoundException e) {
@@ -555,7 +491,7 @@
throw new IndexException("Input/output exception!", e);
}
subBindingsEnabled = false;
- writeDeletedDocsTimer = new Timer("Delete documents writer");
+
}
/**
@@ -712,7 +648,7 @@
*/
public String[][] getRightContext(Binding hit, int numTokens)
throws IndexException {
- DocumentData docData = getDocumentData(hit.getDocumentId());
+ DocumentData docData = index.getDocumentData(hit.getDocumentId());
int startOffset = hit.getTermPosition() + hit.getLength();
if(startOffset >= docData.getTokens().length) {
// hit is at the end of the document
@@ -751,7 +687,7 @@
*/
public String[][] getText(long documentID, int termPosition, int length)
throws IndexException {
- return getDocumentData(documentID).getText(termPosition, length);
+ return index.getDocumentData(documentID).getText(termPosition, length);
}
/**
@@ -773,15 +709,15 @@
DocumentRenderer docRenderer = indexConfig.getDocumentRenderer();
if(docRenderer == null) { throw new IndexException(
"No document renderer is configured for this index!"); }
- docRenderer.render(getDocumentData(docID), hits, output);
+ docRenderer.render(index.getDocumentData(docID), hits, output);
}
public String getDocumentTitle(long docID) throws IndexException {
- return getDocumentData(docID).getDocumentTitle();
+ return index.getDocumentData(docID).getDocumentTitle();
}
public String getDocumentURI(long docID) throws IndexException {
- return getDocumentData(docID).getDocumentURI();
+ return index.getDocumentData(docID).getDocumentURI();
}
/**
@@ -798,35 +734,11 @@
*/
public Serializable getDocumentMetadataField(long docID, String fieldName)
throws IndexException {
- return getDocumentData(docID).getMetadataField(fieldName);
+ return index.getDocumentData(docID).getMetadataField(fieldName);
}
- /**
- * Gets the {@link DocumentData} for a given document ID, from the on disk
- * document collection. In memory caching is performed to reduce the cost of
- * this call.
- * @param documentID
- * the ID of the document to be obtained.
- * @return the {@link DocumentData} associated with the given document ID.
- * @throws IndexException
- */
- public synchronized DocumentData getDocumentData(long documentID)
- throws IndexException {
- if(isDeleted(documentID)) {
- throw new IndexException("Invalid document ID " + documentID);
- }
- DocumentData documentData = documentCache.getAndMoveToFirst(documentID);
- if(documentData == null) {
- // cache miss
- documentData = documentCollection.getDocumentData(documentID);
- documentCache.putAndMoveToFirst(documentID, documentData);
- if(documentCache.size() > DOCUMENT_CACHE_SIZE) {
- documentCache.removeLast();
- }
- }
- return documentData;
- }
+
/**
* Closes this {@link QueryEngine} and releases all resources.
*/
@@ -842,8 +754,7 @@
logger.error("Exception while closing query runner.", e);
}
}
- // close the document collection
- documentCollection.close();
+
// close all the semantic indexers
logger.info("Closing Semantic Annotation Helpers.");
if(indexConfig.getSemanticIndexers() != null) {
@@ -856,16 +767,6 @@
}
}
}
- // write the deleted documents set
- synchronized(writeDeletedDocsTimer) {
- if(writeDeletedDocsTask != null) {
- writeDeletedDocsTask.cancel();
- }
- writeDeletedDocsTimer.cancel();
- // explicitly call it one last time
- new WriteDeletedDocsTask().run();
- }
- documentCache.clear();
for(IndexReaderPool aPool : indexReaderPools) {
try {
aPool.close();
@@ -904,7 +805,8 @@
indexConfig.getSemanticIndexers().length];
try {
- File mg4JIndexDir = new File(indexDir, Indexer.MG4J_INDEX_DIRNAME);
+// File mg4JIndexDir = new File(indexDir, Indexer.MG4J_INDEX_DIRNAME);
+ File mg4JIndexDir = null;
// Load the token indexes
for(int i = 0; i < indexConfig.getTokenIndexers().length; i++) {
File indexBasename =
@@ -938,10 +840,8 @@
openOneSubIndex(indexBasename.toURI());
}
}
- // open the zipped document collection
- documentCollection = new DocumentCollection(indexDir);
- // prepare the document cache
- documentCache = new Long2ObjectLinkedOpenHashMap<DocumentData>();
+
+
} catch(IOException e) {
// IOException gets thrown upward
throw e;
@@ -995,106 +895,5 @@
MG4JTools.upgradeIndex(indexUri);
}
- /**
- * Marks a given document (identified by its ID) as deleted. Deleted
documents
- * are never returned as search results.
- * @param documentId
- */
- public void deleteDocument(long documentId) {
- if(deletedDocumentIds.add(documentId)) {
- writeDeletedDocsLater();
- }
- }
- /**
- * Marks the given batch of documents (identified by ID) as deleted. Deleted
- * documents are never returned as search results.
- * @param documentIds
- */
- public void deleteDocuments(Collection<? extends Number> documentIds) {
- List<Long> idsToDelete = new ArrayList<Long>(documentIds.size());
- for(Number n : documentIds) {
- idsToDelete.add(Long.valueOf(n.longValue()));
- }
- if(deletedDocumentIds.addAll(idsToDelete)) {
- writeDeletedDocsLater();
- }
- }
-
- /**
- * Checks whether a given document (specified by its ID) is marked as
deleted.
- * @param documentId
- * @return
- */
- public boolean isDeleted(long documentId) {
- return deletedDocumentIds.contains(documentId);
- }
-
- /**
- * Mark the given document (identified by ID) as <i>not</i> deleted. Calling
- * this method for a document ID that is not currently marked as deleted has
- * no effect.
- */
- public void undeleteDocument(long documentId) {
- if(deletedDocumentIds.remove(documentId)) {
- writeDeletedDocsLater();
- }
- }
-
- /**
- * Mark the given documents (identified by ID) as <i>not</i> deleted.
Calling
- * this method for a document ID that is not currently marked as deleted has
- * no effect.
- */
- public void undeleteDocuments(Collection<? extends Number> documentIds) {
- List<Long> idsToUndelete = new ArrayList<Long>(documentIds.size());
- for(Number n : documentIds) {
- idsToUndelete.add(Long.valueOf(n.longValue()));
- }
- if(deletedDocumentIds.removeAll(idsToUndelete)) {
- writeDeletedDocsLater();
- }
- }
-
- /**
- * Writes the set of deleted document to disk in a background thread, after a
- * short delay. If a previous request has not started yet, this new request
- * will replace it.
- */
- protected void writeDeletedDocsLater() {
- synchronized(writeDeletedDocsTimer) {
- if(writeDeletedDocsTask != null) {
- writeDeletedDocsTask.cancel();
- }
- writeDeletedDocsTask = new WriteDeletedDocsTask();
- writeDeletedDocsTimer.schedule(writeDeletedDocsTask, 1000);
- }
- }
-
- /**
- * Reads the list of deleted documents from disk.
- */
- @SuppressWarnings("unchecked")
- protected synchronized void readDeletedDocs() throws IOException{
- deletedDocumentIds = Collections.synchronizedSortedSet(
- new TreeSet<Long>());
- File delFile = new File(indexDir, DELETED_DOCUMENT_IDS_FILE_NAME);
- if(delFile.exists()) {
- try {
- ObjectInputStream ois = new ObjectInputStream(
- new GZIPInputStream(
- new BufferedInputStream(
- new FileInputStream(delFile))));
- // an old index will have saved a Set<Integer>, a new one will be
- // Set<Long>
- Set<? extends Number> savedSet = (Set<? extends
Number>)ois.readObject();
- for(Number n : savedSet) {
- deletedDocumentIds.add(Long.valueOf(n.longValue()));
- }
- } catch(ClassNotFoundException e) {
- // this should never happen
- throw new RuntimeException(e);
- }
- }
- }
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
CenturyLink Cloud: The Leader in Enterprise Cloud Services.
Learn Why More Businesses Are Choosing CenturyLink Cloud For
Critical Workloads, Development Environments & Everything In Between.
Get a Quote or Start a Free Trial Today.
http://pubads.g.doubleclick.net/gampad/clk?id=119420431&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs