changed parameter in parser for hdfs from uri to inputstream
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/81c90a48 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/81c90a48 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/81c90a48 Branch: refs/heads/steven/hdfs Commit: 81c90a48ad4980e393497b3ebb8822a3faeb9c16 Parents: d928806 Author: efikalti <[email protected]> Authored: Thu Jun 11 19:07:05 2015 +0300 Committer: efikalti <[email protected]> Committed: Thu Jun 11 19:07:05 2015 +0300 ---------------------------------------------------------------------- .../VXQueryCollectionOperatorDescriptor.java | 11 ++--- .../runtime/functions/util/FunctionHelper.java | 7 ++- .../org/apache/vxquery/xmlparser/XMLParser.java | 46 ++++++++++++++------ 3 files changed, 40 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/81c90a48/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index 0d78cf5..029a857 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -16,11 +16,10 @@ */ package org.apache.vxquery.metadata; -import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -127,7 +126,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO //else check in HDFS file system else { - System.out.println("searching in hdfs for file : " + collectionDirectory.getName()); + System.out.println("searching in hdfs for directory : " + collectionDirectory.getName()); HDFSFunctions hdfs = new HDFSFunctions(); FileSystem fs = hdfs.getFileSystem(); if (fs != null) @@ -148,7 +147,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); } - parser.parseHDFSElements(new URI(xmlDocument.getName()), writer, fta, tupleIndex); + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSElements(in, writer, fta, tupleIndex); } } } @@ -164,9 +164,6 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } catch (IOException e) { // TODO Auto-generated catch block System.err.println(e); - } catch (URISyntaxException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/81c90a48/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java index 801c249..dd89c5a 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java @@ -21,6 +21,7 @@ import java.io.DataOutput; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -1210,7 +1211,8 @@ public class FunctionHelper { try { if (fs.exists(xmlDocument)) { - parser.parseHDFSDocument(new URI(xmlDocument.getName()), abvs); + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSDocument( in, abvs); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block @@ -1218,9 +1220,6 @@ public class FunctionHelper { } catch (IOException e) { // TODO Auto-generated catch block System.err.println(e); - } catch (URISyntaxException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/81c90a48/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java index 66d2cf7..291017e 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.ObjectInputStream; import java.io.Reader; @@ -29,9 +30,12 @@ import java.util.List; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.vxquery.context.StaticContext; import org.apache.vxquery.exceptions.VXQueryFileNotFoundException; import org.apache.vxquery.exceptions.VXQueryParseException; +import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.types.SequenceType; import org.xml.sax.InputSource; import org.xml.sax.SAXException; @@ -134,25 +138,41 @@ public class XMLParser { } } - public void parseHDFSElements(URI uri, IFrameWriter writer, FrameTupleAccessor fta, int tupleIndex) throws IOException + public void parseHDFSElements(InputStream inputStream, IFrameWriter writer, FrameTupleAccessor fta, int tupleIndex) throws IOException { + try { + Reader input; + if (bufferSize > 0) { + input = new BufferedReader(new InputStreamReader(inputStream), bufferSize); + } else { + input = new InputStreamReader(inputStream); + } + in.setCharacterStream(input); handler.setupElementWriter(writer, fta, tupleIndex); - try { - parser.parse(uri.toString()); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (SAXException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + parser.parse(in); + input.close(); + } catch (IOException e) { + HyracksDataException hde = new HyracksDataException(e); + hde.setNodeId(nodeId); + throw hde; + } catch (SAXException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } - public void parseHDFSDocument(URI uri, ArrayBackedValueStorage abvs) throws HyracksDataException { + public void parseHDFSDocument(InputStream inputStream, ArrayBackedValueStorage abvs) throws HyracksDataException { try { - System.out.println("read hdfs document"); - parser.parse(uri.toString()); + Reader input; + if (bufferSize > 0) { + input = new BufferedReader(new InputStreamReader(inputStream), bufferSize); + } else { + input = new InputStreamReader(inputStream); + } + in.setCharacterStream(input); + parser.parse(in); handler.writeDocument(abvs); + input.close(); } catch (IOException e) { HyracksDataException hde = new HyracksDataException(e); hde.setNodeId(nodeId);
