Revision: 14764
          http://gate.svn.sourceforge.net/gate/?rev=14764&view=rev
Author:   ian_roberts
Date:     2011-12-15 12:22:46 +0000 (Thu, 15 Dec 2011)
Log Message:
-----------
Implementation of FederatedQueryRunner

Modified Paths:
--------------
    mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java

Modified: mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java
===================================================================
--- mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java      
2011-12-15 11:10:39 UTC (rev 14763)
+++ mimir/trunk/mimir-core/src/gate/mimir/search/FederatedQueryRunner.java      
2011-12-15 12:22:46 UTC (rev 14764)
@@ -36,54 +36,12 @@
 public class FederatedQueryRunner implements QueryRunner {
   
   /**
-   * Background action responsible with updating the document statistics from 
-   * the sub-runners.
-   */
-  private class DocumentDataUpdater implements Runnable {
-
-    @Override
-    public void run() {
-      boolean allDone = false;
-      while(!allDone) {
-        allDone = true;
-        int newCurrentDocCount = 0;
-        for(QueryRunner aRunner : subrunners) {
-          int subDocCount = aRunner.getDocumentsCount();
-          if(subDocCount > 0) {
-            newCurrentDocCount += subDocCount;
-          } else {
-            allDone = false;
-            newCurrentDocCount += aRunner.getDocumentsCurrentCount();
-          }
-        }
-        currentDocumentsCount = newCurrentDocCount;
-        if(allDone) {
-          documentsCount = newCurrentDocCount;
-        } else {
-          try {
-            // wait a while and try again
-            Thread.sleep(100);
-          } catch(InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    }
-  }
-  
-  /**
    * The total number of result documents (or -1 if not yet known).
    */
-  private volatile int documentsCount;
+  private int documentsCount = -1;
   
-  /**
-   * The current number of documents. After all documents have been retrieved, 
-   * this value is identical to {@link #documentsCount}. 
-   */
-  private volatile int currentDocumentsCount;
+  private boolean closed;
   
-  private volatile boolean closed;
-  
   /**
    * Shared Logger
    */
@@ -92,9 +50,14 @@
   /**
    * The query runners for the sub-indexes.
    */
-  protected QueryRunner[] subrunners;
+  protected QueryRunner[] subRunners;
   
   /**
+   * The next rank that needs to be merged from each sub runner.
+   */
+  protected int[] nextSubRunnerRank;
+  
+  /**
    * For each result document rank, this list supplies the index for the 
    * sub-runner that supplied the document.  
    */
@@ -107,25 +70,9 @@
    */  
   protected IntList rank2subRank;
   
-  /**
-   * The score for each result document. 
-   */
-  protected DoubleList rank2score;
-  
-  /**
-   * The documentID (within the original sub-runner) for each result document.
-   */
-  protected IntList rank2subDocumentId;
-  
   public FederatedQueryRunner(QueryRunner[] subrunners, Executor threadSource) 
{
-    this.subrunners = subrunners;
-    DocumentDataUpdater docDataUpdater = new DocumentDataUpdater();
-    if(threadSource != null) {
-      threadSource.execute(docDataUpdater);
-    } else {
-      new Thread(docDataUpdater, 
-        DocumentDataUpdater.class.getCanonicalName()).start();
-    }    
+    this.subRunners = subrunners;
+    this.nextSubRunnerRank = null;
   }
 
   /* (non-Javadoc)
@@ -133,6 +80,27 @@
    */
   @Override
   public int getDocumentsCount() {
+    if(documentsCount < 0) {
+      int newDocumentsCount = 0;
+      for(QueryRunner subRunner : subRunners) {
+        int subDocumentsCount = subRunner.getDocumentsCount();
+        if(subDocumentsCount < 0) {
+          return -1;
+        } else {
+          newDocumentsCount += subDocumentsCount;
+        }
+      }
+      synchronized(this) {
+        // initialize the nextSubRunnerRank array
+        nextSubRunnerRank = new int[subRunners.length];
+        for(int i = 0; i < nextSubRunnerRank.length; i++) {
+          if(subRunners[i].getDocumentsCount() == 0) {
+            nextSubRunnerRank[i] = -1;
+          }
+        }
+        documentsCount = newDocumentsCount;
+      }
+    }
     return documentsCount;
   }
 
@@ -141,17 +109,55 @@
    */
   @Override
   public int getDocumentsCurrentCount() {
-    return (documentsCount < 0) ? currentDocumentsCount : documentsCount;
+    if(documentsCount >= 0) {
+      return documentsCount;
+    } else {
+      int newDocumentsCount = 0;
+      for(QueryRunner subRunner : subRunners) {
+        newDocumentsCount += subRunner.getDocumentsCurrentCount();
+      }
+      return newDocumentsCount;
+    }
   }
-
+  
+  /**
+   * Ensure that the given rank is resolved to the appropriate sub-runner rank.
+   * @throws IndexOutOfBoundsException if rank is beyond the last document.
+   */
+  private final synchronized void checkRank(int rank) throws 
IndexOutOfBoundsException, IOException {
+    // quick check to see if we need to do anything else
+    if(rank < rank2runnerIndex.size()) {
+      return;
+    }
+    
+    for(int nextRank = rank2runnerIndex.size(); nextRank <= rank; nextRank++) {
+      int subRunnerWithMin = 0;
+      for(int i = 1; i < subRunners.length; i++) {
+        if((nextSubRunnerRank[i] >= 0) &&
+            (subRunners[i].getDocumentScore(nextSubRunnerRank[i]) <
+            
subRunners[subRunnerWithMin].getDocumentScore(nextSubRunnerRank[subRunnerWithMin])))
 {
+          subRunnerWithMin = i;
+        }
+      }
+      // consume the next doc from subRunnerWithMin
+      rank2runnerIndex.add(subRunnerWithMin);
+      rank2subRank.add(nextSubRunnerRank[subRunnerWithMin]++);
+      if(nextSubRunnerRank[subRunnerWithMin] >= 
subRunners[subRunnerWithMin].getDocumentsCount()) {
+        // this runner has run out of documents
+        nextSubRunnerRank[subRunnerWithMin] = -1;
+      }
+    }
+  }
+  
   /* (non-Javadoc)
    * @see gate.mimir.search.QueryRunner#getDocumentID(int)
    */
   @Override
   public int getDocumentID(int rank) throws IndexOutOfBoundsException,
     IOException {
-    // TODO Auto-generated method stub
-    return 0;
+    checkRank(rank);
+    int subId = 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentID(rank2subRank.getInt(rank));
+    return subId * subRunners.length + rank2runnerIndex.getInt(rank);
   }
 
   /* (non-Javadoc)
@@ -160,8 +166,8 @@
   @Override
   public double getDocumentScore(int rank) throws IndexOutOfBoundsException,
     IOException {
-    // TODO Auto-generated method stub
-    return 0;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentScore(rank2subRank.getInt(rank));
   }
 
   /* (non-Javadoc)
@@ -170,8 +176,8 @@
   @Override
   public List<Binding> getDocumentHits(int rank)
     throws IndexOutOfBoundsException, IOException {
-    // TODO Auto-generated method stub
-    return null;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentHits(rank2subRank.getInt(rank));
   }
 
   /* (non-Javadoc)
@@ -180,8 +186,8 @@
   @Override
   public String[][] getDocumentText(int rank, int termPosition, int length)
     throws IndexException, IndexOutOfBoundsException, IOException {
-    // TODO Auto-generated method stub
-    return null;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentText(rank2subRank.getInt(rank),
 termPosition, length);
   }
 
   /* (non-Javadoc)
@@ -190,8 +196,8 @@
   @Override
   public String getDocumentURI(int rank) throws IndexException,
     IndexOutOfBoundsException, IOException {
-    // TODO Auto-generated method stub
-    return null;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentURI(rank2subRank.getInt(rank));
   }
 
   /* (non-Javadoc)
@@ -200,8 +206,8 @@
   @Override
   public String getDocumentTitle(int rank) throws IndexException,
     IndexOutOfBoundsException, IOException {
-    // TODO Auto-generated method stub
-    return null;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentTitle(rank2subRank.getInt(rank));
   }
 
   /* (non-Javadoc)
@@ -210,8 +216,8 @@
   @Override
   public Serializable getDocumentMetadataField(int rank, String fieldName)
     throws IndexException, IndexOutOfBoundsException, IOException {
-    // TODO Auto-generated method stub
-    return null;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataField(rank2subRank.getInt(rank),
 fieldName);
   }
 
   /* (non-Javadoc)
@@ -221,8 +227,8 @@
   public Map<String, Serializable> getDocumentMetadataFields(int rank,
                                                              Set<String> 
fieldNames)
     throws IndexException, IndexOutOfBoundsException, IOException {
-    // TODO Auto-generated method stub
-    return null;
+    checkRank(rank);
+    return 
subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataFields(rank2subRank.getInt(rank),
 fieldNames);
   }
 
   /* (non-Javadoc)
@@ -231,7 +237,8 @@
   @Override
   public void renderDocument(int rank, Appendable out) throws IOException,
     IndexException {
-    // TODO Auto-generated method stub
+    checkRank(rank);
+    
subRunners[rank2runnerIndex.getInt(rank)].renderDocument(rank2subRank.getInt(rank),
 out);
   }
 
   /* (non-Javadoc)
@@ -239,6 +246,8 @@
    */
   @Override
   public void close() throws IOException {
-    // TODO Auto-generated method stub
+    for(QueryRunner r : subRunners) {
+      r.close();
+    }
   }
 }

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
10 Tips for Better Server Consolidation
Server virtualization is being driven by many needs.  
But none more important than the need to reduce IT complexity 
while improving strategic productivity.  Learn More! 
http://www.accelacomm.com/jaw/sdnl/114/51507609/
_______________________________________________
GATE-cvs mailing list
GATE-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to