Revision: 14764 http://gate.svn.sourceforge.net/gate/?rev=14764&view=rev Author: ian_roberts Date: 2011-12-15 12:22:46 +0000 (Thu, 15 Dec 2011) Log Message: ----------- Implementation of FederatedQueryRunner
Modified Paths: -------------- mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java Modified: mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java =================================================================== --- mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java 2011-12-15 11:10:39 UTC (rev 14763) +++ mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java 2011-12-15 12:22:46 UTC (rev 14764) @@ -36,54 +36,12 @@ public class FederatedQueryRunner implements QueryRunner { /** - * Background action responsible with updating the document statistics from - * the sub-runners. - */ - private class DocumentDataUpdater implements Runnable { - - @Override - public void run() { - boolean allDone = false; - while(!allDone) { - allDone = true; - int newCurrentDocCount = 0; - for(QueryRunner aRunner : subrunners) { - int subDocCount = aRunner.getDocumentsCount(); - if(subDocCount > 0) { - newCurrentDocCount += subDocCount; - } else { - allDone = false; - newCurrentDocCount += aRunner.getDocumentsCurrentCount(); - } - } - currentDocumentsCount = newCurrentDocCount; - if(allDone) { - documentsCount = newCurrentDocCount; - } else { - try { - // wait a while and try again - Thread.sleep(100); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - } - - /** * The total number of result documents (or -1 if not yet known). */ - private volatile int documentsCount; + private int documentsCount = -1; - /** - * The current number of documents. After all documents have been retrieved, - * this value is identical to {@link #documentsCount}. - */ - private volatile int currentDocumentsCount; + private boolean closed; - private volatile boolean closed; - /** * Shared Logger */ @@ -92,9 +50,14 @@ /** * The query runners for the sub-indexes. */ - protected QueryRunner[] subrunners; + protected QueryRunner[] subRunners; /** + * The next rank that needs to be merged from each sub runner. + */ + protected int[] nextSubRunnerRank; + + /** * For each result document rank, this list supplies the index for the * sub-runner that supplied the document. */ @@ -107,25 +70,9 @@ */ protected IntList rank2subRank; - /** - * The score for each result document. - */ - protected DoubleList rank2score; - - /** - * The documentID (within the original sub-runner) for each result document. - */ - protected IntList rank2subDocumentId; - public FederatedQueryRunner(QueryRunner[] subrunners, Executor threadSource) { - this.subrunners = subrunners; - DocumentDataUpdater docDataUpdater = new DocumentDataUpdater(); - if(threadSource != null) { - threadSource.execute(docDataUpdater); - } else { - new Thread(docDataUpdater, - DocumentDataUpdater.class.getCanonicalName()).start(); - } + this.subRunners = subrunners; + this.nextSubRunnerRank = null; } /* (non-Javadoc) @@ -133,6 +80,27 @@ */ @Override public int getDocumentsCount() { + if(documentsCount < 0) { + int newDocumentsCount = 0; + for(QueryRunner subRunner : subRunners) { + int subDocumentsCount = subRunner.getDocumentsCount(); + if(subDocumentsCount < 0) { + return -1; + } else { + newDocumentsCount += subDocumentsCount; + } + } + synchronized(this) { + // initialize the nextSubRunnerRank array + nextSubRunnerRank = new int[subRunners.length]; + for(int i = 0; i < nextSubRunnerRank.length; i++) { + if(subRunners[i].getDocumentsCount() == 0) { + nextSubRunnerRank[i] = -1; + } + } + documentsCount = newDocumentsCount; + } + } return documentsCount; } @@ -141,17 +109,55 @@ */ @Override public int getDocumentsCurrentCount() { - return (documentsCount < 0) ? currentDocumentsCount : documentsCount; + if(documentsCount >= 0) { + return documentsCount; + } else { + int newDocumentsCount = 0; + for(QueryRunner subRunner : subRunners) { + newDocumentsCount += subRunner.getDocumentsCurrentCount(); + } + return newDocumentsCount; + } } - + + /** + * Ensure that the given rank is resolved to the appropriate sub-runner rank. + * @throws IndexOutOfBoundsException if rank is beyond the last document. + */ + private final synchronized void checkRank(int rank) throws IndexOutOfBoundsException, IOException { + // quick check to see if we need to do anything else + if(rank < rank2runnerIndex.size()) { + return; + } + + for(int nextRank = rank2runnerIndex.size(); nextRank <= rank; nextRank++) { + int subRunnerWithMin = 0; + for(int i = 1; i < subRunners.length; i++) { + if((nextSubRunnerRank[i] >= 0) && + (subRunners[i].getDocumentScore(nextSubRunnerRank[i]) < + subRunners[subRunnerWithMin].getDocumentScore(nextSubRunnerRank[subRunnerWithMin]))) { + subRunnerWithMin = i; + } + } + // consume the next doc from subRunnerWithMin + rank2runnerIndex.add(subRunnerWithMin); + rank2subRank.add(nextSubRunnerRank[subRunnerWithMin]++); + if(nextSubRunnerRank[subRunnerWithMin] >= subRunners[subRunnerWithMin].getDocumentsCount()) { + // this runner has run out of documents + nextSubRunnerRank[subRunnerWithMin] = -1; + } + } + } + /* (non-Javadoc) * @see gate.mimir.search.QueryRunner#getDocumentID(int) */ @Override public int getDocumentID(int rank) throws IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return 0; + checkRank(rank); + int subId = subRunners[rank2runnerIndex.getInt(rank)].getDocumentID(rank2subRank.getInt(rank)); + return subId * subRunners.length + rank2runnerIndex.getInt(rank); } /* (non-Javadoc) @@ -160,8 +166,8 @@ @Override public double getDocumentScore(int rank) throws IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return 0; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentScore(rank2subRank.getInt(rank)); } /* (non-Javadoc) @@ -170,8 +176,8 @@ @Override public List<Binding> getDocumentHits(int rank) throws IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return null; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentHits(rank2subRank.getInt(rank)); } /* (non-Javadoc) @@ -180,8 +186,8 @@ @Override public String[][] getDocumentText(int rank, int termPosition, int length) throws IndexException, IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return null; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentText(rank2subRank.getInt(rank), termPosition, length); } /* (non-Javadoc) @@ -190,8 +196,8 @@ @Override public String getDocumentURI(int rank) throws IndexException, IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return null; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentURI(rank2subRank.getInt(rank)); } /* (non-Javadoc) @@ -200,8 +206,8 @@ @Override public String getDocumentTitle(int rank) throws IndexException, IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return null; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentTitle(rank2subRank.getInt(rank)); } /* (non-Javadoc) @@ -210,8 +216,8 @@ @Override public Serializable getDocumentMetadataField(int rank, String fieldName) throws IndexException, IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return null; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataField(rank2subRank.getInt(rank), fieldName); } /* (non-Javadoc) @@ -221,8 +227,8 @@ public Map<String, Serializable> getDocumentMetadataFields(int rank, Set<String> fieldNames) throws IndexException, IndexOutOfBoundsException, IOException { - // TODO Auto-generated method stub - return null; + checkRank(rank); + return subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataFields(rank2subRank.getInt(rank), fieldNames); } /* (non-Javadoc) @@ -231,7 +237,8 @@ @Override public void renderDocument(int rank, Appendable out) throws IOException, IndexException { - // TODO Auto-generated method stub + checkRank(rank); + subRunners[rank2runnerIndex.getInt(rank)].renderDocument(rank2subRank.getInt(rank), out); } /* (non-Javadoc) @@ -239,6 +246,8 @@ */ @Override public void close() throws IOException { - // TODO Auto-generated method stub + for(QueryRunner r : subRunners) { + r.close(); + } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ 10 Tips for Better Server Consolidation Server virtualization is being driven by many needs. But none more important than the need to reduce IT complexity while improving strategic productivity. Learn More! http://www.accelacomm.com/jaw/sdnl/114/51507609/ _______________________________________________ GATE-cvs mailing list GATE-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/gate-cvs