Revision: 18830
          http://sourceforge.net/p/gate/code/18830
Author:   markagreenwood
Date:     2015-07-27 15:50:21 +0000 (Mon, 27 Jul 2015)
Log Message:
-----------
initial code drop of support for streaming documents from a CSV file into GCP 
-- it should work but so far it hasn't been tested at all, that comes next

Added Paths:
-----------
    gate/trunk/plugins/Format_CSV/gcp/
    gate/trunk/plugins/Format_CSV/gcp/.classpath
    gate/trunk/plugins/Format_CSV/gcp/.project
    gate/trunk/plugins/Format_CSV/gcp/README
    gate/trunk/plugins/Format_CSV/gcp/build.xml
    gate/trunk/plugins/Format_CSV/gcp/src/
    gate/trunk/plugins/Format_CSV/gcp/src/gate/
    gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/
    gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/
    gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/
    
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java

Added: gate/trunk/plugins/Format_CSV/gcp/.classpath
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/.classpath                                
(rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/.classpath        2015-07-27 15:50:21 UTC 
(rev 18830)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+       <classpathentry kind="src" path="src"/>
+       <classpathentry kind="con" 
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+       <classpathentry combineaccessrules="false" kind="src" path="/gcp"/>
+       <classpathentry kind="lib" 
path="/home/mark/gate-top/externals/gate/plugins/Format_CSV/lib/opencsv-2.3.jar"/>
+       <classpathentry kind="output" path="classes"/>
+</classpath>

Added: gate/trunk/plugins/Format_CSV/gcp/.project
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/.project                          (rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/.project  2015-07-27 15:50:21 UTC (rev 
18830)
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+       <name>CSV4GCP</name>
+       <comment></comment>
+       <projects>
+       </projects>
+       <buildSpec>
+               <buildCommand>
+                       <name>org.eclipse.jdt.core.javabuilder</name>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+       </buildSpec>
+       <natures>
+               <nature>org.eclipse.jdt.core.javanature</nature>
+       </natures>
+</projectDescription>

Added: gate/trunk/plugins/Format_CSV/gcp/README
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/README                            (rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/README    2015-07-27 15:50:21 UTC (rev 
18830)
@@ -0,0 +1,7 @@
+The code in this folder adds support for streaming documents from a CSV file.
+files. It was compiled against version 2.6-SNAPSHOT of GCP.
+
+To recompile this code you need to specify the location of a GCP distribution
+(not an SVN checkout). For example
+
+ant -Dgcp.home=/home/mark/gcp-2.6-SNAPSHOT/

Added: gate/trunk/plugins/Format_CSV/gcp/build.xml
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/build.xml                         (rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/build.xml 2015-07-27 15:50:21 UTC (rev 
18830)
@@ -0,0 +1,58 @@
+<project name="CSV4GCP" basedir="." default="jar">
+  <!-- Prevent Ant from warning about includeantruntime not being set -->
+  <property name="build.sysclasspath" value="ignore" />
+  
+       <property file="build.properties" />
+
+       <fail unless="gcp.home">"gcp.home" property must be set before CSV 
support for GCP can be compiled</fail>
+
+       <property name="gcp.lib" location="${gcp.home}/lib" />
+       <property name="src.dir" location="src" />
+       <property name="classes.dir" location="classes" />
+       <property name="jar.location" location="csv4gcp.jar" />
+       <property name="lib" location="../lib" />
+
+       <!-- Path to compile - includes gcp/lib/*.jar -->
+       <path id="compile.classpath">
+               <fileset dir="${lib}">
+                       <include name="**/*.jar" />
+               </fileset>
+               <fileset dir="${gcp.lib}">
+                       <include name="**/*.jar" />
+                       <include name="**/*.zip" />
+               </fileset>
+       </path>
+
+       <!-- create build directory structure -->
+       <target name="prepare">
+               <mkdir dir="${classes.dir}" />
+       </target>
+
+       <target name="resources" depends="prepare">
+               <!-- <copy todir="${classes.dir}/gate/resources" 
includeEmptyDirs="true">
+                       <fileset dir="${src.dir}/gate/resources" />
+               </copy> -->
+       </target>
+
+       <!-- compile the source -->
+       <target name="compile" depends="prepare, resources">
+               <javac classpathref="compile.classpath" srcdir="${src.dir}" 
destdir="${classes.dir}" debug="true" debuglevel="lines,source" source="1.5" 
target="1.5">
+               </javac>
+       </target>
+
+       <!-- create the JAR file -->
+       <target name="jar" depends="compile">
+               <jar destfile="${jar.location}" update="false" 
basedir="${classes.dir}" />
+       </target>
+
+       <!-- remove the generated .class files -->
+       <target name="clean.classes">
+               <delete dir="${classes.dir}" />
+       </target>
+
+       <!-- Clean up - remove .class and .jar files -->
+       <target name="clean" depends="clean.classes">
+               <delete file="${jar.location}" />
+       </target>
+
+</project>

Added: 
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
===================================================================
--- 
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
                               (rev 0)
+++ 
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
       2015-07-27 15:50:21 UTC (rev 18830)
@@ -0,0 +1,291 @@
+/*
+ * CSVStreamingInputHandler.java
+ * 
+ * Copyright (c) 2015, The University of Sheffield. See the file COPYRIGHT.txt
+ * in the software or at http://gate.ac.uk/gate/COPYRIGHT.txt
+ * 
+ * This file is part of GATE (see http://gate.ac.uk/), and is free software,
+ * licenced under the GNU Library General Public License, Version 2, June 1991
+ * (in the distribution as file licence.html, and also available at
+ * http://gate.ac.uk/gate/licence.html).
+ * 
+ * Mark A. Greenwood, 27/07/2015
+ */
+
+package gate.cloud.io.csv;
+
+import static gate.cloud.io.IOConstants.PARAM_BATCH_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.PARAM_ENCODING;
+import static gate.cloud.io.IOConstants.PARAM_SOURCE_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import gate.Document;
+import gate.Factory;
+import gate.FeatureMap;
+import gate.GateConstants;
+import gate.cloud.batch.Batch;
+import gate.cloud.batch.DocumentID;
+import gate.cloud.io.DocumentData;
+import gate.cloud.io.IOConstants;
+import gate.cloud.io.StreamingInputHandler;
+import gate.util.GateException;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+public class CSVStreamingInputHandler implements StreamingInputHandler {
+
+  public static final String PARAM_SEPARATOR_CHARACTER = "separator";
+
+  public static final String PARAM_QUOTE_CHARACTER = "quote";
+
+  public static final String PARAM_LABELLED_COLUMNS = "labelledColumns";
+
+  public static final String PARAM_COLUMN = "column";
+
+  private static Logger logger = Logger
+    .getLogger(CSVStreamingInputHandler.class);
+
+  /**
+   * Document IDs that are already complete after a previous run of this batch.
+   */
+  protected Set<String> completedDocuments;
+
+  /**
+   * Base directory of the batch.
+   */
+  protected File batchDir;
+
+  /**
+   * The source CSV file from which documents will be streamed.
+   */
+  protected File srcFile;
+
+  protected CSVReader csvReader;
+
+  protected String encoding;
+
+  protected char separatorChar;
+
+  protected char quoteChar;
+
+  protected long idCounter;
+
+  protected int column;
+
+  protected String[] features;
+
+  protected boolean colLabels;
+
+  /**
+   * Compression applied to the input file. This can be
+   * {@link IOConstants#VALUE_COMPRESSION_GZIP} in which case the file will be
+   * unpacked using Java's native GZIP support. Any other value is assumed to 
be
+   * a command line to an external command that can accept an additional
+   * parameter giving the path to the file and produce the uncompressed data on
+   * its standard output, e.g. "lzop -dc" for .lzo compression.
+   */
+  protected String compression;
+
+  /**
+   * External decompression process, if applicable.
+   */
+  protected Process decompressProcess = null;
+
+  @Override
+  public void config(Map<String, String> configData) throws IOException,
+    GateException {
+
+    String srcFileStr = configData.get(PARAM_SOURCE_FILE_LOCATION);
+    if(srcFileStr == null) {
+      throw new IllegalArgumentException("Parameter " +
+        PARAM_SOURCE_FILE_LOCATION + " is required");
+    } else {
+      String batchFileStr = configData.get(PARAM_BATCH_FILE_LOCATION);
+      if(batchFileStr != null) {
+        batchDir = new File(batchFileStr).getParentFile();
+      }
+      srcFile = new File(srcFileStr);
+      if(!srcFile.isAbsolute()) {
+        srcFile = new File(batchDir, srcFileStr);
+      }
+      if(!srcFile.exists()) { throw new IllegalArgumentException("File \"" +
+        srcFile + "\", provided as value for required parameter \"" +
+        PARAM_SOURCE_FILE_LOCATION + "\", does not exist!"); }
+      if(!srcFile.isFile()) { throw new IllegalArgumentException("File \"" +
+        srcFile + "\", provided as value for required parameter \"" +
+        PARAM_SOURCE_FILE_LOCATION + "\", is not a file!"); }
+    }
+
+    encoding = configData.get(PARAM_ENCODING);
+    separatorChar = configData.get(PARAM_SEPARATOR_CHARACTER).charAt(0);
+    quoteChar = configData.get(PARAM_SEPARATOR_CHARACTER).charAt(0);
+    colLabels = Boolean.parseBoolean(configData.get(PARAM_LABELLED_COLUMNS));
+    column = Integer.parseInt(configData.get(PARAM_COLUMN));
+  }
+
+  @SuppressWarnings("resource")
+  @Override
+  public void init() throws IOException, GateException {
+    InputStream inputStream = null;
+    if(compression == null) {
+      inputStream = new FileInputStream(srcFile);
+    } else if("any".equals(compression)) {
+      inputStream = new BufferedInputStream(new FileInputStream(srcFile));
+      try {
+        inputStream =
+          new CompressorStreamFactory()
+            .createCompressorInputStream(inputStream);
+      } catch(CompressorException e) {
+        if(e.getCause() != null) {
+          if(e.getCause() instanceof IOException) {
+            throw (IOException)e.getCause();
+          } else {
+            throw new GateException(e.getCause());
+          }
+        } else {
+          // unrecognised signature, assume uncompressed
+          logger
+            .info("Failed to detect compression format, assuming no 
compression");
+        }
+      }
+    } else {
+      if(compression == VALUE_COMPRESSION_GZIP) {
+        compression = CompressorStreamFactory.GZIP;
+      }
+      inputStream = new BufferedInputStream(new FileInputStream(srcFile));
+      try {
+        inputStream =
+          new CompressorStreamFactory().createCompressorInputStream(
+            compression, inputStream);
+      } catch(CompressorException e) {
+        if(e.getCause() != null) {
+          if(e.getCause() instanceof IOException) {
+            throw (IOException)e.getCause();
+          } else {
+            throw new GateException(e.getCause());
+          }
+        } else {
+          // unrecognised compressor name
+          logger
+            .info("Unrecognised compression format, assuming external 
compressor");
+          IOUtils.closeQuietly(inputStream);
+          // treat compression value as a command line
+          ProcessBuilder pb =
+            new ProcessBuilder(compression.trim().split("\\s+"));
+          pb.directory(batchDir);
+          pb.redirectError(Redirect.INHERIT);
+          pb.redirectOutput(Redirect.PIPE);
+          pb.redirectInput(srcFile);
+          decompressProcess = pb.start();
+          inputStream = decompressProcess.getInputStream();
+        }
+      }
+    }
+
+    csvReader =
+      new CSVReader(new InputStreamReader(inputStream, encoding),
+        separatorChar, quoteChar);
+
+    features = (colLabels ? csvReader.readNext() : null);
+
+    idCounter = 0;
+
+  }
+
+  @Override
+  public DocumentData getInputDocument(DocumentID id) throws IOException,
+    GateException {
+
+    throw new UnsupportedOperationException(
+      "CSVStreamingInputHandler can only operate in streaming mode");
+  }
+
+  @Override
+  public void startBatch(Batch b) {
+    completedDocuments = b.getCompletedDocuments();
+    if(completedDocuments != null && completedDocuments.size() > 0) {
+      logger.info("Restarting failed batch - " + completedDocuments.size() +
+        " documents already processed");
+    }
+  }
+
+  @Override
+  public DocumentData nextDocument() throws IOException, GateException {
+
+    // get the next line from the CSV file
+    String[] nextLine;
+
+    while((nextLine = csvReader.readNext()) != null) {
+
+      // skip the line if there are less columns than we need to get to the
+      // content
+      if(column >= nextLine.length) continue;
+
+      // skip the line if the column with the content is empty
+      if(nextLine[column].trim().equals("")) continue;
+
+      String id = srcFile.getName() + ":" + idCounter++;
+
+      if(completedDocuments.contains(id)) continue;
+
+      DocumentID docId = new DocumentID(id);
+
+      FeatureMap docFeatures = Factory.newFeatureMap();
+      docFeatures.put(GateConstants.THROWEX_FORMAT_PROPERTY_NAME, 
Boolean.TRUE);
+      
+      if(colLabels) {
+        // copy all the features from the row into a FeatureMap using the
+        // labels from the first line
+        for(int i = 0; i < features.length; ++i) {
+          if(i != column && i < nextLine.length) {
+            docFeatures.put(features[i], nextLine[i]);
+          }
+        }
+      }
+
+      FeatureMap docParams = Factory.newFeatureMap();
+      docParams.put(Document.DOCUMENT_STRING_CONTENT_PARAMETER_NAME,
+        nextLine[column]);
+
+      try {
+        Document gateDoc =
+          (Document)Factory.createResource("gate.corpora.DocumentImpl",
+            docParams, docFeatures, id);
+        return new DocumentData(gateDoc, docId);
+      } catch(Exception e) {
+        logger.warn("Error encountered while parsing object with ID " + id +
+          " - skipped", e);
+      }
+
+    }
+
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException, GateException {
+    csvReader.close();
+    if(decompressProcess != null) {
+      try {
+        decompressProcess.waitFor();
+      } catch(InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+  }
+}

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to