Revision: 17426
          http://sourceforge.net/p/gate/code/17426
Author:   valyt
Date:     2014-02-26 12:15:19 +0000 (Wed, 26 Feb 2014)
Log Message:
-----------
Redesigned the batching implementation to work with a single shared byte buffer 
and object output stream, which allows it to be compatible with the current 
implementation in Mimir 4 (and 5).

Modified Paths:
--------------
    mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java

Modified: 
mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java
===================================================================
--- mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java    
2014-02-26 11:58:22 UTC (rev 17425)
+++ mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java    
2014-02-26 12:15:19 UTC (rev 17426)
@@ -22,152 +22,50 @@
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.net.URL;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.Timer;
+import java.util.TimerTask;
 
+import org.apache.log4j.Logger;
+
 /**
  * Utility class that implements the client side of the Mimir RPC indexing
  * protocol. 
  */
 public class MimirConnector {
   
-  /**
-   * Background runnable actually responsible for sending documents to the 
-   * remote server.
-   */
-  protected class DocumentPusher implements Runnable {
-    
-    public DocumentPusher() {
-      byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
-      lastWrite = System.currentTimeMillis();
-    }
-
-    /**
-     * The maximum size of the {@link #byteBuffer}.
-     */
-    protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
-    
-    protected long lastWrite;
-    /**
-     * A byte buffer accumulating data to be sent to the remote server.
-     */
-    protected ByteArrayOutputStream byteBuffer;
-    
-    @Override
-    public void run() {
-      try {
-        // The logic is: if there is a connectionInterval (positive value), 
then
-        // check the state with the same interval, to make sure we flush the 
-        // data to the server, even if no more documents are submitted.
-        // If the connection interval is negative, then every time there is
-        // a document in the queue, it will immediately be sent to the server
-        // so we don't need to time-out the poll.
-        byte[] data = inputBuffer.poll(
-            (connectionInterval > 0 ? connectionInterval : Integer.MAX_VALUE), 
-            TimeUnit.MILLISECONDS);
-        while(data != END_OF_LIST) {
-          try{
-            if(data != null) {
-              // if too much data, write the buffer
-              if(data.length + byteBuffer.size() > BYTE_BUFFER_SIZE){
-                writeBuffer(); // this will also empty (reset) the buffer
-              }
-              // add the current document to the buffer
-              byteBuffer.write(data);              
-            }
-            // if too long since last write, write the buffer
-            if(System.currentTimeMillis() - lastWrite > connectionInterval) {
-              writeBuffer();
-            }
-          } catch(IOException e) {
-            // error communicating with the remote end point
-            exception = e;
-          }
-          data = inputBuffer.poll(
-              (connectionInterval > 0 ? connectionInterval : 
Integer.MAX_VALUE), 
-              TimeUnit.MILLISECONDS);
-        }
-        // we're closing
-        if(byteBuffer.size() > 0) {
-          try{
-            writeBuffer();
-          } catch(IOException e) {
-            // error communicating with the remote end point
-            exception = e;
-          }
-        }
-      } catch(InterruptedException e) {
-        if(closed) {
-          // we're done
-        } else {
-          Thread.currentThread().interrupt();
-        }
-      } 
-    }
-    
-    /**
-     * Writes the current contents of the byte buffer to the remote server. 
-     * @throws IOException 
-     */
-    protected void writeBuffer() throws IOException {
-      if(byteBuffer.size() > 0) {
-        // first phase - call the indexUrl action to find out where to post the
-        // data
-        StringBuilder indexURLString = new 
StringBuilder(indexURL.toExternalForm());
-        if(indexURLString.length() == 0) {
-          throw new IllegalArgumentException("No index URL specified");
-        }
-        if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
-          // add a slash if necessary
-          indexURLString.append('/');
-        }
-        indexURLString.append("manage/indexUrl");
-        StringBuilder postUrlBuilder = new StringBuilder();
-        
-        webUtils.getText(postUrlBuilder, indexURLString.toString());
-
-        // second phase - post to the URL we were given
-        webUtils.postData(postUrlBuilder.toString(), byteBuffer);
-        byteBuffer.reset();        
-      }
-      
-      lastWrite = System.currentTimeMillis();
-    }
-  }
+  protected static Logger logger = Logger.getLogger(MimirConnector.class);
   
   /**
-   * Constant used to mark the end of input.
+   * The maximum size of the {@link #byteBuffer}.
    */
-  protected static final byte[] END_OF_LIST = new byte[]{};
+  protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
   
   /**
-   * The size (number of documents) of the {@link #inputBuffer}.
+   * Timer used to regularly check if there is any data to send to the remote 
+   * server.
    */
-  protected static final int INPUT_BUFFER_SIZE = 10;
+  protected Timer backgroundTimer;
   
   /**
-   * A queue holding serialised GATE documents waiting to be sent to the remote
-   * server by the {@link DocumentPusher}.
+   * Has this connector been closed?
    */
-  protected BlockingQueue<byte[]> inputBuffer;
+  protected volatile boolean closed = false;
   
   /**
-   * The background thread used to actually send the documents to the remote 
-   * server.
+   * The last time we sent data to the remote server
    */
-  protected Thread pusherThread;
+  protected volatile long lastWrite;
   
   /**
-   * Has this connector been closed?
+   * A byte buffer accumulating data to be sent to the remote server.
    */
-  protected volatile boolean closed = false;
+  protected ByteArrayOutputStream byteBuffer;
   
   /**
-   * If the background thread encounters a problem, the cause will be stored
-   * here so that it  can reported from the main thread at first opportunity 
+   * An instance of {@link ObjectOutputStream} used to serialise document for
+   * transmission over the wire.
    */
-  protected volatile IOException exception;
+  protected ObjectOutputStream objectOutputStream;
   
   /**
    * The name for the document feature used to hold the document URI.
@@ -198,19 +96,15 @@
    */
   public static final int DEFAULT_CONNECTION_INTERVAL = -1;
   
-  private static MimirConnector staticConnector;
-  
-  public MimirConnector(URL indexUrl, WebUtils webUtils) {
+  public MimirConnector(URL indexUrl, WebUtils webUtils) throws IOException {
     this.indexURL = indexUrl;
     this.webUtils = webUtils;
-    inputBuffer = new ArrayBlockingQueue<>(INPUT_BUFFER_SIZE);
-    DocumentPusher docPusher = new DocumentPusher();
-    pusherThread = new Thread(docPusher, 
-        getClass().getName() + " document pusher");
-    pusherThread.start();
+    byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
+    objectOutputStream = new ObjectOutputStream(byteBuffer);
+    lastWrite = System.currentTimeMillis();
   }
   
-  public MimirConnector(URL indexUrl) {
+  public MimirConnector(URL indexUrl) throws IOException {
     this(indexUrl, new WebUtils());
   }
   
@@ -237,31 +131,31 @@
    */
   public void sendToMimir(Document doc, String documentURI) throws 
IOException, InterruptedException {
     if(closed) throw new IOException("This Mímir connector has been closed.");
-    
-    if(exception != null) {
-      IOException throwable = new IOException("Exception in background thread",
-          exception);
-      exception = null;
-      throw throwable;
-    }
-    
+
     boolean uriFeatureWasSet = false;
     Object oldUriFeatureValue = null;
 
-    if(documentURI != null) {
+    if(documentURI != null && doc != null) {
       // set the URI as a document feature, saving the old value (if any)
       uriFeatureWasSet = doc.getFeatures().containsKey(MIMIR_URI_FEATURE);
       oldUriFeatureValue = doc.getFeatures().get(MIMIR_URI_FEATURE);
       doc.getFeatures().put(MIMIR_URI_FEATURE, documentURI);
     }
     
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    ObjectOutputStream oos = new ObjectOutputStream(baos);
-    oos.writeObject(doc);
-    oos.close();
-    inputBuffer.put(baos.toByteArray());
+    synchronized(this) {
+      if(doc != null){
+        objectOutputStream.writeObject(doc);
+      }
+      if(byteBuffer.size() > BYTE_BUFFER_SIZE) {
+        writeBuffer(); // this will also empty (reset) the buffer
+      }
+      // if too long since last write, write the buffer
+      if(System.currentTimeMillis() - lastWrite > connectionInterval) {
+        writeBuffer();
+      }
+    }
 
-    if(documentURI != null) {
+    if(documentURI != null && doc != null) {
       // reset the URI feature to the value it had (or didn't have) before
       if(uriFeatureWasSet) {
         doc.getFeatures().put(MIMIR_URI_FEATURE, oldUriFeatureValue);
@@ -272,6 +166,38 @@
   }
   
   /**
+   * Writes the current contents of the byte buffer to the remote server. 
+   * @throws IOException 
+   */
+  protected synchronized void writeBuffer() throws IOException {
+    if(byteBuffer.size() > 0) {
+      // first phase - call the indexUrl action to find out where to post the
+      // data
+      StringBuilder indexURLString = new 
StringBuilder(indexURL.toExternalForm());
+      if(indexURLString.length() == 0) {
+        throw new IllegalArgumentException("No index URL specified");
+      }
+      if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
+        // add a slash if necessary
+        indexURLString.append('/');
+      }
+      indexURLString.append("manage/indexUrl");
+      StringBuilder postUrlBuilder = new StringBuilder();
+      
+      webUtils.getText(postUrlBuilder, indexURLString.toString());
+
+      // second phase - post to the URL we were given
+      // close the object OS so that it writes its coda
+      objectOutputStream.close();
+      webUtils.postData(postUrlBuilder.toString(), byteBuffer);
+      byteBuffer.reset();
+      objectOutputStream = new ObjectOutputStream(byteBuffer);
+    }
+    
+    lastWrite = System.currentTimeMillis();
+  }
+  
+  /**
    * Gets the current value for the connection interval (see 
    * {@link #setConnectionInterval(int)}). 
    * @return
@@ -291,8 +217,29 @@
 
    * @param connectionInterval
    */
-  public void setConnectionInterval(int connectionInterval) {
+  public synchronized void setConnectionInterval(int connectionInterval) {
     this.connectionInterval = connectionInterval;
+    if(connectionInterval <= 0 && backgroundTimer != null) {
+      backgroundTimer.cancel();
+    } else {
+      if(backgroundTimer != null) {
+        backgroundTimer.cancel();
+      }
+      backgroundTimer = new Timer(getClass().getName() +  " background timer");
+      // we set a timer task that regularly submits a null value. This causes
+      // the connector to flush any data that is getting too old.
+      backgroundTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          try {
+            sendToMimir(null, null);
+          } catch(Exception e) {
+            // this should never happen
+            logger.error(MimirConnector.class.getName() + " internal error", 
e);
+          }
+        }
+      }, connectionInterval, connectionInterval);
+    }
   }
 
   /**
@@ -306,15 +253,12 @@
    * waiting to notify the background thread of the termination.
    */
   public void close() throws IOException, InterruptedException {
-    if(exception != null){
-      IOException throwable = new IOException(
-          "Exception in background thread", exception);
-      exception = null;
-      throw throwable;
+    closed = true;
+    if(backgroundTimer != null){
+      backgroundTimer.cancel();
     }
-    closed = true;
-    inputBuffer.put(END_OF_LIST);
-    pusherThread.join();
+    // flush all cached content one last time
+    writeBuffer();
   }
 }
 

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Flow-based real-time traffic analytics software. Cisco certified tool.
Monitor traffic, SLAs, QoS, Medianet, WAAS etc. with NetFlow Analyzer
Customize your own dashboards, set traffic alerts and generate reports.
Network behavioral analysis & security monitoring. All-in-one tool.
http://pubads.g.doubleclick.net/gampad/clk?id=126839071&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to