Revision: 18208
          http://sourceforge.net/p/gate/code/18208
Author:   ian_roberts
Date:     2014-07-23 12:50:37 +0000 (Wed, 23 Jul 2014)
Log Message:
-----------
Output handler that writes JSON objects concatenated to a single file, Twitter 
stream style, rather than one file per document.

Modified Paths:
--------------
    gcp/trunk/src/gate/cloud/io/IOConstants.java
    gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
    gcp/trunk/src/gate/cloud/util/Tools.java

Added Paths:
-----------
    gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java

Modified: gcp/trunk/src/gate/cloud/io/IOConstants.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/IOConstants.java        2014-07-23 01:21:04 UTC 
(rev 18207)
+++ gcp/trunk/src/gate/cloud/io/IOConstants.java        2014-07-23 12:50:37 UTC 
(rev 18208)
@@ -157,6 +157,11 @@
    * streamed JSON object.
    */
   public static final String PARAM_ID_POINTER = "idPointer";
+  
+  /**
+   * Target size for a single output file from a streaming output handler.
+   */
+  public static final String PARAM_CHUNK_SIZE = "chunkSize";
 
   /**
    * XML namespace used for all elements in a batch definition XML file.

Modified: gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java     
2014-07-23 01:21:04 UTC (rev 18207)
+++ gcp/trunk/src/gate/cloud/io/json/JSONStreamingInputHandler.java     
2014-07-23 12:50:37 UTC (rev 18208)
@@ -64,9 +64,9 @@
  * The input file may be compressed. If the "compression" option is set
  * to "gzip" then the file will be unpacked using Java's native GZIP
  * support. Any other value of "compression" will be treated as a
- * command line to a program that expects the file name as its final
- * parameter and will produce uncompressed output on its standard out.
- * The command will be split into words at whitespace, so embedded
+ * command line to a program that expects the compressed data on its
+ * standard input and will produce uncompressed output on its standard
+ * out. The command will be split into words at whitespace, so embedded
  * whitespace within a single word is not permitted. For example, to
  * handle JSON files compressed in LZO format use
  * <code>compression="lzop -dc"</code>.
@@ -203,6 +203,10 @@
 
   public void startBatch(Batch b) {
     completedDocuments = b.getCompletedDocuments();
+    if(completedDocuments != null && completedDocuments.size() > 0) {
+      logger.info("Restarting failed batch - " + completedDocuments.size()
+              + " documents already processed");
+    }
   }
 
   public void init() throws IOException, GateException {
@@ -213,14 +217,11 @@
       inputStream = new GZIPInputStream(new FileInputStream(srcFile));
     } else {
       // treat compression value as a command line
-      List<String> cmdLine =
-              new ArrayList<String>(Arrays.asList(compression.trim().split(
-                      "\\s+")));
-      cmdLine.add(srcFile.getAbsolutePath());
-      ProcessBuilder pb = new ProcessBuilder(cmdLine);
+      ProcessBuilder pb = new ProcessBuilder(compression.trim().split("\\s+"));
       pb.directory(batchDir);
       pb.redirectError(Redirect.INHERIT);
       pb.redirectOutput(Redirect.PIPE);
+      pb.redirectInput(srcFile);
       decompressProcess = pb.start();
       inputStream = decompressProcess.getInputStream();
     }
@@ -264,6 +265,8 @@
         if(logger.isDebugEnabled()) {
           logger.debug("No ID found in JSON object " + json + " - ignored");
         }
+      } else if(completedDocuments.contains(id)) {
+        // already processed, ignore
       } else {
         DocumentID docId = new DocumentID(id);
         FeatureMap docParams = Factory.newFeatureMap();

Added: gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java            
                (rev 0)
+++ gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java    
2014-07-23 12:50:37 UTC (rev 18208)
@@ -0,0 +1,224 @@
+/*
+ *  JSONStreamingOutputHandler.java
+ *  Copyright (c) 2007-2014, The University of Sheffield.
+ *
+ *  This file is part of GCP (see http://gate.ac.uk/), and is free
+ *  software, licenced under the GNU Affero General Public License,
+ *  Version 3, November 2007.
+ *
+ *
+ *  $Id$ 
+ */
+package gate.cloud.io.json;
+
+import static gate.cloud.io.IOConstants.PARAM_BATCH_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.PARAM_CHUNK_SIZE;
+import static gate.cloud.io.IOConstants.PARAM_PATTERN;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_NONE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.log4j.Logger;
+
+import gate.Document;
+import gate.cloud.batch.DocumentID;
+import gate.cloud.io.file.JSONOutputHandler;
+import gate.util.GateException;
+
+/**
+ * JSON output handler that concatenates JSON objects into a single
+ * large file rather than putting each one in its own individual file
+ * named for the document ID.
+ * 
+ * @author Ian Roberts
+ * 
+ */
+public class JSONStreamingOutputHandler extends JSONOutputHandler {
+
+  private static final Logger logger = Logger
+          .getLogger(JSONStreamingOutputHandler.class);
+
+  protected String pattern;
+
+  protected long chunkSize = -1L;
+  
+  protected File batchDir;
+
+  private static final byte[] END_OF_DATA = new byte[0];
+
+  @Override
+  protected void configImpl(Map<String, String> configData) throws IOException,
+          GateException {
+    // TODO Auto-generated method stub
+    super.configImpl(configData);
+    String batchFileStr = configData.get(PARAM_BATCH_FILE_LOCATION);
+    if(batchFileStr != null) {
+      batchDir = new File(batchFileStr).getParentFile();
+    }
+    pattern = configData.get(PARAM_PATTERN);
+    if(pattern == null) {
+      pattern = "part-%03d";
+    }
+    String chunkSizeStr = configData.get(PARAM_CHUNK_SIZE);
+    try {
+      chunkSize = Long.parseLong(chunkSizeStr);
+    } catch(Exception e) {
+      logger.info("Using default chunk size");
+      chunkSize = 99000000;
+    }
+  }
+
+  protected ExecutorService processWaiter = Executors.newCachedThreadPool();
+  
+  protected ThreadLocal<ByteArrayOutputStream> baos =
+          new ThreadLocal<ByteArrayOutputStream>() {
+            protected ByteArrayOutputStream initialValue() {
+              return new ByteArrayOutputStream();
+            }
+          };
+
+  protected BlockingQueue<byte[]> results = new 
ArrayBlockingQueue<byte[]>(100);
+
+  @Override
+  protected void outputDocumentImpl(Document document, DocumentID documentId)
+          throws IOException, GateException {
+    super.outputDocumentImpl(document, documentId);
+    baos.get().write('\n');
+    byte[] result = baos.get().toByteArray();
+    baos.get().reset();
+    try {
+      results.put(result);
+    } catch(InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  protected OutputStream getFileOutputStream(DocumentID docId)
+          throws IOException {
+    return baos.get();
+  }
+
+  @Override
+  public void init() throws IOException, GateException {
+    // TODO Auto-generated method stub
+    new Thread(new StreamOutputter()).start();
+  }
+
+  @Override
+  public void close() throws IOException, GateException {
+    try {
+      results.put(END_OF_DATA);
+    } catch(InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  protected class StreamOutputter implements Runnable {
+    private File currentFile;
+
+    private int currentChunk = -1;
+
+    private OutputStream currentOutput;
+
+    private Process currentProcess;
+
+    public void run() {
+      byte[] item = null;
+      try {
+        try {
+          int bytesSinceLastCheck = 0;
+          while((item = results.take()) != END_OF_DATA) {
+            if(currentOutput == null) {
+              try {
+                openNextChunk();
+              } catch(IOException e) {
+                logger.error("Failed to open output file", e);
+                
+              }
+            }
+            try {
+              currentOutput.write(item);
+              currentOutput.flush();
+            } catch(IOException e) {
+              logger.warn("Error writing to " + currentFile.getAbsolutePath(), 
e);
+            }
+            bytesSinceLastCheck += item.length;
+            if(bytesSinceLastCheck > 1024 * 1024) {
+              if(currentFile.length() > chunkSize) {
+                closeChunk();
+              }
+              bytesSinceLastCheck = 0;
+            }
+          }
+        } finally {
+          closeChunk();
+          processWaiter.shutdown();
+        }
+      } catch(InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    private void closeChunk() {
+      if(currentOutput != null) {
+        try {
+          currentOutput.close();
+        } catch(IOException e) {
+          logger.warn("Error closing file " + currentFile.getAbsolutePath(), 
e);
+        }
+        if(currentProcess != null) {
+          final Process p = currentProcess;
+          processWaiter.execute(new Runnable() {
+            public void run() {
+              try {
+                p.waitFor();
+              } catch(InterruptedException e) {
+                logger.warn("Interrupted while waiting for process", e);
+                Thread.currentThread().interrupt();
+              }
+            }
+          });
+          currentProcess = null;
+        }
+        currentOutput = null;
+        currentFile = null;
+      }
+    }
+    
+    private void openNextChunk() throws IOException {
+      // if we're restarting we might have to skip some batches
+      do {
+        String newFileName = String.format(pattern, ++currentChunk);
+        currentFile = namingStrategy.toFile(new DocumentID(newFileName));
+      } while(currentFile.exists());
+      if(VALUE_COMPRESSION_GZIP.equals(compression)) {
+        currentOutput = new GZIPOutputStream(new 
FileOutputStream(currentFile));
+      } else if(compression == null || 
VALUE_COMPRESSION_NONE.equals(compression)) {
+        currentOutput = new FileOutputStream(currentFile);
+      } else {
+        // treat compression value as a command line
+        ProcessBuilder pb = new 
ProcessBuilder(compression.trim().split("\\s+"));
+        pb.directory();
+        pb.redirectInput(Redirect.PIPE);
+        pb.redirectOutput(currentFile);
+        pb.redirectError(Redirect.INHERIT);
+        currentProcess = pb.start();
+        currentOutput = currentProcess.getOutputStream();
+      }
+    }
+  }
+
+}


Property changes on: 
gcp/trunk/src/gate/cloud/io/json/JSONStreamingOutputHandler.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Modified: gcp/trunk/src/gate/cloud/util/Tools.java
===================================================================
--- gcp/trunk/src/gate/cloud/util/Tools.java    2014-07-23 01:21:04 UTC (rev 
18207)
+++ gcp/trunk/src/gate/cloud/util/Tools.java    2014-07-23 12:50:37 UTC (rev 
18208)
@@ -105,7 +105,13 @@
     writer.writeEndElement(); writer.writeCharacters("\n  ");
     
     writer.writeStartElement(Tools.REPORT_NAMESPACE, "totalDocuments");
-    writer.writeCharacters(Integer.toString(jobData.getTotalDocumentCount()));
+    int totalDocs = jobData.getTotalDocumentCount();
+    if(totalDocs < 0) {
+      // streaming mode, so we don't know totaldocs up front, calculate it
+      // from success and error
+      totalDocs = jobData.getSuccessDocumentCount() + 
jobData.getErrorDocumentCount();
+    }
+    writer.writeCharacters(Integer.toString(totalDocs));
     writer.writeEndElement(); writer.writeCharacters("\n  ");
     
     writer.writeStartElement(Tools.REPORT_NAMESPACE, "successfullyProcessed");

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to