Revision: 17299
http://sourceforge.net/p/gate/code/17299
Author: valyt
Date: 2014-02-13 15:20:19 +0000 (Thu, 13 Feb 2014)
Log Message:
-----------
First bash at automatic compaction.
Modified Paths:
--------------
mimir/branches/5.0/mimir-core/src/gate/mimir/IndexConfig.java
mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/IndexConfig.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/IndexConfig.java
2014-02-13 15:00:13 UTC (rev 17298)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/IndexConfig.java
2014-02-13 15:20:19 UTC (rev 17299)
@@ -229,7 +229,13 @@
*/
public static final int DEFAULT_TIME_BETWEEN_BATCHES = 3600 * 1000;
+
/**
+ * The default value for {@link #maximumBatches}
+ */
+ public static final int DEFAULT_MAXIMUM_BATCHES = 20;
+
+ /**
* A Map storing values that need to be passed between the various pluggable
* components used by this index (e.g. ORDI-based annotation helpers may
* pass references to the ORDI Factory between each other).
@@ -347,15 +353,53 @@
return semanticIndexers;
}
+ /**
+ * Gets the current value for the time interval (in milliseconds) between
the
+ * saving of a batch and the next. This is the maximum interval documents
+ * submitted for indexing are kept in RAM (and are thus not searcheable).
+ *
+ * Defaults to {@value #DEFAULT_TIME_BETWEEN_BATCHES}.
+ * @return
+ */
public int getTimeBetweenBatches() {
return timeBetweenBatches;
}
+ /**
+ * Sets the current value for the time interval (in milliseconds) between
the
+ * saving of a batch and the next. This is the maximum interval documents
+ * submitted for indexing are kept in RAM (and are thus not searcheable).
+ *
+ * Defaults to {@value #DEFAULT_TIME_BETWEEN_BATCHES}.
+ */
public void setTimeBetweenBatches(int timeBetweenBatches) {
this.timeBetweenBatches = timeBetweenBatches;
}
/**
+ * Gets the maximum number of on-disk index batches before an index
compaction
+ * is triggered.
+ *
+ * Defaults to {@link #DEFAULT_MAXIMUM_BATCHES}.
+ * @return
+ */
+ public int getMaximumBatches() {
+ return maximumBatches;
+ }
+
+
+ /**
+ * Sets the maximum number of on-disk index batches before an index
compaction
+ * is triggered.
+ *
+ * Defaults to {@link #DEFAULT_MAXIMUM_BATCHES}.
+ * @param maximumBatches
+ */
+ public void setMaximumBatches(int maximumBatches) {
+ this.maximumBatches = maximumBatches;
+ }
+
+ /**
* Gets the options map - a Map with arbitrary configuration options, which
* is made available to all sub-elements of this index (e.g. the various
* annotation helpers).
@@ -605,7 +649,14 @@
*/
private int timeBetweenBatches = DEFAULT_TIME_BETWEEN_BATCHES;
+
/**
+ * The maximum number of constituent batches in any atomic index before a
+ * compact operation is triggered.
+ */
+ private int maximumBatches = DEFAULT_MAXIMUM_BATCHES;
+
+ /**
* A Map with arbitrary configuration options, which is made available to all
* sub-elements of this index (e.g. the various annotation helpers).
*/
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-02-13 15:00:13 UTC (rev 17298)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-02-13 15:20:19 UTC (rev 17299)
@@ -45,6 +45,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
@@ -170,6 +171,10 @@
}
}
+ /**
+ * {@link TimerTask} used to regularly dump the latest document to an on-disk
+ * batch, allowing them to become searchable.
+ */
protected class DumpToDiskTask extends TimerTask {
@Override
public void run() {
@@ -185,6 +190,67 @@
}
/**
+ * {@link TimerTask} used to regularly check if the index needs compacting,
+ * and to perform the compaction if so.
+ */
+ protected class CompactIndexTask extends TimerTask {
+
+ public static final int SCHEDULE_INTERVAL = 10 * 60 * 1000;
+
+ @Override
+ public void run() {
+ boolean shouldCompact = false;
+ for(AtomicIndex aSubIndex : subIndexes) {
+ if(aSubIndex.getBatchCount() > indexConfig.getMaximumBatches()) {
+ shouldCompact = true;
+ break;
+ }
+ }
+ if(shouldCompact) {
+ List<Future<Void>> futures = requestCompactIndex();
+ try {
+ compactDocumentCollection();
+ } catch(Exception e) {
+ logger.error("Error while compacting document collection. "
+ + "Index is now invalid. Closing index to avoid further damage.",
+ e);
+ try {
+ close();
+ } catch(InterruptedException e1) {
+ logger.error("Received interrupt request while closing "
+ + "operation in progress", e);
+ } catch(IOException e1) {
+ logger.error("Further IO exception while closing index.", e1);
+ }
+ }
+ for(Future<Void> f : futures){
+ try {
+ f.get();
+ } catch(InterruptedException e) {
+ // we were interrupted while waiting for a compacting operation
+ logger.error("Received interrupt request while compacting "
+ + "operation in progress", e);
+ } catch(ExecutionException e) {
+ logger.error("Execution exception while comapting the index. "
+ + "Index may now be corrupted, closing it to avoid further
damage", e);
+ try {
+ close();
+ } catch(InterruptedException e1) {
+ logger.error("Received interrupt request while closing "
+ + "operation in progress", e);
+ } catch(IOException e1) {
+ logger.error("Further IO exception while closing index.", e1);
+ }
+ }
+ }
+ }
+ synchronized(maintenanceTimer) {
+ maintenanceTimer.schedule(new CompactIndexTask(), SCHEDULE_INTERVAL);
+ }
+ }
+
+ }
+ /**
* The {@link IndexConfig} used for this index.
*/
protected IndexConfig indexConfig;
@@ -376,6 +442,8 @@
dumpToDiskTask = new DumpToDiskTask();
maintenanceTimer.schedule(dumpToDiskTask,
indexConfig.getTimeBetweenBatches());
+ maintenanceTimer.schedule(new CompactIndexTask(),
+ CompactIndexTask.SCHEDULE_INTERVAL);
}
// open the zipped document collection
documentCollection = new DocumentCollection(indexDirectory);
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2014-02-13 15:00:13 UTC (rev 17298)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2014-02-13 15:20:19 UTC (rev 17299)
@@ -690,7 +690,7 @@
/**
* A list containing the head and tails of this index.
*/
- protected List<MG4JIndex> subIndexes;
+ protected List<MG4JIndex> batches;
/**
* The cluster-view of all the MG4J indexes that are part of this index (i.e.
@@ -843,14 +843,14 @@
*/
protected void initIndex() throws IOException, IndexException {
// open the index
- subIndexes = new ArrayList<AtomicIndex.MG4JIndex>();
+ batches = new ArrayList<AtomicIndex.MG4JIndex>();
if(indexDirectory.exists()) {
// opening an existing index
- List<String> batches = new ArrayList<String>();
+ List<String> batchNames = new ArrayList<String>();
File headDir = new File(indexDirectory, HEAD_FILE_NAME);
if(headDir.exists()) {
- batches.add(HEAD_FILE_NAME);
+ batchNames.add(HEAD_FILE_NAME);
}
Map<Integer, String> tails = new TreeMap<Integer, String>();
for(String aTail : indexDirectory.list(TAILS_FILENAME_FILTER)) {
@@ -859,12 +859,12 @@
aTail);
}
// add the tails in order
- batches.addAll(tails.values());
+ batchNames.addAll(tails.values());
// modify internal state
synchronized(this) {
// load all batches, in order
- for(String batchName : batches) {
- subIndexes.add(openSubIndex(batchName));
+ for(String batchName : batchNames) {
+ batches.add(openSubIndex(batchName));
}
}
} else {
@@ -872,7 +872,7 @@
indexDirectory.mkdirs();
}
synchronized(this) {
- invertedIndex = openInvertedIndexCluster(subIndexes, termProcessor);
+ invertedIndex = openInvertedIndexCluster(batches, termProcessor);
}
// open direct index
if(hasDirectIndex) {
@@ -892,7 +892,7 @@
}
}
synchronized(this) {
- directIndex = openDirectIndexCluster(subIndexes);
+ directIndex = openDirectIndexCluster(batches);
}
}
}
@@ -1072,10 +1072,10 @@
try {
// modify internal state
synchronized(this) {
- subIndexes.add(openSubIndex(newTailName));
- invertedIndex = openInvertedIndexCluster(subIndexes, termProcessor);
+ batches.add(openSubIndex(newTailName));
+ invertedIndex = openInvertedIndexCluster(batches, termProcessor);
if(hasDirectIndex) {
- directIndex = openDirectIndexCluster(subIndexes);
+ directIndex = openDirectIndexCluster(batches);
}
}
} catch(Exception e) {
@@ -1108,7 +1108,7 @@
// The document pointers in RAM are zero-based, so we need to add all the
// documents on disk to this.
long docsOnDisk = 0;
- for(MG4JIndex index : subIndexes) {
+ for(MG4JIndex index : batches) {
docsOnDisk += index.invertedIndex.numberOfDocuments;
}
@@ -1285,7 +1285,7 @@
File headDirNew = new File(indexDirectory, HEAD_FILE_NAME +
HEAD_NEW_EXT);
// make a local copy of the sub-indexes
List<MG4JIndex> indexesToMerge =
- new ArrayList<AtomicIndex.MG4JIndex>(subIndexes);
+ new ArrayList<AtomicIndex.MG4JIndex>(batches);
if(!headDirNew.mkdir()) {
throw new IndexException("Could not create new head directory at "
+
headDirNew.getAbsolutePath() + "!");
@@ -1333,16 +1333,16 @@
// update the internal state
synchronized(this) {
// remove the indexes that were merged
- subIndexes.removeAll(indexesToMerge);
+ batches.removeAll(indexesToMerge);
// insert the new head at the front of the list
File headDir = new File(indexDirectory, HEAD_FILE_NAME);
File headDirOld = new File(indexDirectory, HEAD_FILE_NAME +
HEAD_OLD_EXT);
if(headDir.exists() && headDir.renameTo(headDirOld)){
if(headDirNew.renameTo(headDir)) {
- subIndexes.add(0, openSubIndex(HEAD_FILE_NAME));
- invertedIndex = openInvertedIndexCluster(subIndexes, termProcessor);
+ batches.add(0, openSubIndex(HEAD_FILE_NAME));
+ invertedIndex = openInvertedIndexCluster(batches, termProcessor);
if(hasDirectIndex) {
- directIndex =openDirectIndexCluster(subIndexes);
+ directIndex =openDirectIndexCluster(batches);
}
// clean-up: delete old head, used-up tails
if(!gate.util.Files.rmdir(headDirOld)) {
@@ -1858,4 +1858,12 @@
String termStr = directTerms.get(directTermId);
return invertedIndex.documents(termStr).count();
}
+
+ /**
+ * Returns the number of batches in this atomic index.
+ * @return
+ */
+ public int getBatchCount() {
+ return batches.size();
+ }
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Android apps run on BlackBerry 10
Introducing the new BlackBerry 10.2.1 Runtime for Android apps.
Now with support for Jelly Bean, Bluetooth, Mapview and more.
Get your Android app in front of a whole new audience. Start now.
http://pubads.g.doubleclick.net/gampad/clk?id=124407151&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs