locate directory in hdfs and pass files to xml parser
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/edc79187 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/edc79187 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/edc79187 Branch: refs/heads/steven/hdfs Commit: edc791878505947ce533476d358a46664fe340db Parents: 5e86d37 Author: efikalti <[email protected]> Authored: Thu Jun 4 19:42:11 2015 +0300 Committer: efikalti <[email protected]> Committed: Thu Jun 4 19:42:11 2015 +0300 ---------------------------------------------------------------------- .../apache/vxquery/hdfs2/HDFSFileFunctions.java | 34 +++----- .../VXQueryCollectionOperatorDescriptor.java | 91 ++++++++++++++++---- .../org/apache/vxquery/xmlparser/XMLParser.java | 22 +++++ 3 files changed, 107 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/edc79187/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java index c0b43ca..2f1d289 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java @@ -1,9 +1,6 @@ package org.apache.vxquery.hdfs2; -import java.io.BufferedReader; -import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -39,13 +36,17 @@ public class HDFSFileFunctions { * @param filename * @return */ - public boolean isLocatedInHDFS(String filename) throws IOException + public boolean isLocatedInHDFS(String filename) { - //search file path - if (fs.exists(new Path(filename))) - { - return true; - } + try { + //search file path + if (fs.exists(new Path(filename))) + { + return true; + } + } catch (IOException ex) { + System.err.println(ex); + } //Search every file and folder in the home directory if (searchInDirectory(fs.getHomeDirectory(), filename) != null) { @@ -82,19 +83,8 @@ public class HDFSFileFunctions { return null; } - public void readFile(String filename) throws IOException + public FileSystem getFileSystem() { - Path path = this.searchInDirectory(fs.getHomeDirectory(), filename); - if (path != null) - { - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); - String line; - line = br.readLine(); - while (line != null) { - System.out.println(line); - line = br.readLine(); - } - fs.close(); - } + return this.fs; } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/edc79187/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index 8fdd1ec..c318162 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -16,7 +16,11 @@ */ package org.apache.vxquery.metadata; +import java.io.BufferedReader; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -25,7 +29,12 @@ import java.util.logging.Logger; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.hdfs2.HDFSFileFunctions; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; import org.apache.vxquery.xmlparser.TreeNodeIdProvider; import org.apache.vxquery.xmlparser.XMLParser; @@ -50,6 +59,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private List<Integer> childSeq; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); + private final String hdfs_conf_dir = "/home/efi/Utilities/hadoop/etc/hadoop/"; + public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, RecordDescriptor rDesc) { super(spec, 1, 1); @@ -88,25 +99,69 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void nextFrame(ByteBuffer buffer) throws HyracksDataException { fta.reset(buffer); String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); + File collectionDirectory = new File(collectionModifiedName); - - // Go through each tuple. - if (collectionDirectory.isDirectory()) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - @SuppressWarnings("unchecked") - Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), - TrueFileFilter.INSTANCE); - while (it.hasNext()) { - File xmlDocument = it.next(); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); - } - parser.parseElements(xmlDocument, writer, fta, tupleIndex); - } - } - } else { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); + //check if it in the local file system + if(collectionDirectory.exists()) + { + // Go through each tuple. + if (collectionDirectory.isDirectory()) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + @SuppressWarnings("unchecked") + Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), + TrueFileFilter.INSTANCE); + while (it.hasNext()) { + File xmlDocument = it.next(); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); + } + parser.parseElements(xmlDocument, writer, fta, tupleIndex); + } + } + } else { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); + } + } + //check in HDFS file system + else + { + HDFSFileFunctions hdfs = new HDFSFileFunctions(hdfs_conf_dir); + FileSystem fs = hdfs.getFileSystem(); + Path directory = new Path(collectionModifiedName); + Path xmlDocument; + try { + if (fs.exists(directory) && fs.isDirectory(directory)) + { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + //read directory files from HDFS + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + while (it.hasNext()) + { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) + { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); + } + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(xmlDocument))); + parser.parseElements(br, writer, fta, tupleIndex); + } + } + } + } + else + { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + System.err.println(e); + } catch (IOException e) { + // TODO Auto-generated catch block + System.err.println(e); + } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/edc79187/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java index 23977d5..e3075d8 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java @@ -129,5 +129,27 @@ public class XMLParser { throw hde; } } + + public void parseElements(BufferedReader br, IFrameWriter writer, FrameTupleAccessor fta, int tupleIndex) + throws HyracksDataException { + try { + Reader input; + if (bufferSize > 0) { + input = br; + in.setCharacterStream(input); + handler.setupElementWriter(writer, fta, tupleIndex); + try { + parser.parse(in); + } catch (SAXException e) { + System.err.println(e); + } + input.close(); + } + } catch (IOException e) { + HyracksDataException hde = new HyracksDataException(e); + hde.setNodeId(nodeId); + throw hde; + } + } } \ No newline at end of file
