locate directory in hdfs and pass files to xml parser

Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/edc79187
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/edc79187
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/edc79187

Branch: refs/heads/steven/hdfs
Commit: edc791878505947ce533476d358a46664fe340db
Parents: 5e86d37
Author: efikalti <[email protected]>
Authored: Thu Jun 4 19:42:11 2015 +0300
Committer: efikalti <[email protected]>
Committed: Thu Jun 4 19:42:11 2015 +0300

----------------------------------------------------------------------
 .../apache/vxquery/hdfs2/HDFSFileFunctions.java | 34 +++-----
 .../VXQueryCollectionOperatorDescriptor.java    | 91 ++++++++++++++++----
 .../org/apache/vxquery/xmlparser/XMLParser.java | 22 +++++
 3 files changed, 107 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/edc79187/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java
index c0b43ca..2f1d289 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java
@@ -1,9 +1,6 @@
 package org.apache.vxquery.hdfs2;
 
-import java.io.BufferedReader;
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -39,13 +36,17 @@ public class HDFSFileFunctions {
      * @param filename
      * @return 
      */
-    public boolean isLocatedInHDFS(String filename) throws IOException
+    public boolean isLocatedInHDFS(String filename)
     {
-        //search file path
-        if (fs.exists(new Path(filename)))
-        {
-            return true;
-        }
+        try {
+             //search file path
+             if (fs.exists(new Path(filename)))
+             {
+                 return true;
+             }
+         } catch (IOException ex) {
+             System.err.println(ex);
+         }
         //Search every file and folder in the home directory
         if (searchInDirectory(fs.getHomeDirectory(), filename) != null)
         {
@@ -82,19 +83,8 @@ public class HDFSFileFunctions {
         return null;
     }
     
-    public void readFile(String filename) throws IOException
+    public FileSystem getFileSystem()
     {
-        Path path = this.searchInDirectory(fs.getHomeDirectory(), filename);
-        if (path != null)
-        {
-            BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
-            String line;
-            line = br.readLine();
-            while (line != null) {
-                System.out.println(line);
-                line = br.readLine();
-            }
-            fs.close();
-        }
+       return this.fs;
     }
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/edc79187/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index 8fdd1ec..c318162 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -16,7 +16,11 @@
  */
 package org.apache.vxquery.metadata;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -25,7 +29,12 @@ import java.util.logging.Logger;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.vxquery.context.DynamicContext;
+import org.apache.vxquery.hdfs2.HDFSFileFunctions;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.XMLParser;
@@ -50,6 +59,8 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
     private List<Integer> childSeq;
     protected static final Logger LOGGER = 
Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
 
+    private final String hdfs_conf_dir = 
"/home/efi/Utilities/hadoop/etc/hadoop/";
+    
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry 
spec, VXQueryCollectionDataSource ds,
             RecordDescriptor rDesc) {
         super(spec, 1, 1);
@@ -88,25 +99,69 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 fta.reset(buffer);
                 String collectionModifiedName = 
collectionName.replace("${nodeId}", nodeId);
+                
                 File collectionDirectory = new File(collectionModifiedName);
-
-                // Go through each tuple.
-                if (collectionDirectory.isDirectory()) {
-                    for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); 
++tupleIndex) {
-                        @SuppressWarnings("unchecked")
-                        Iterator<File> it = 
FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
-                                TrueFileFilter.INSTANCE);
-                        while (it.hasNext()) {
-                            File xmlDocument = it.next();
-                            if (LOGGER.isLoggable(Level.FINE)) {
-                                LOGGER.fine("Starting to read XML document: " 
+ xmlDocument.getAbsolutePath());
-                            }
-                            parser.parseElements(xmlDocument, writer, fta, 
tupleIndex);
-                        }
-                    }
-                } else {
-                    throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
-                            + collectionDirectory.getAbsolutePath() + ") 
passed to collection.");
+                //check if it in the local file system
+                if(collectionDirectory.exists())
+                {
+                       // Go through each tuple.
+                       if (collectionDirectory.isDirectory()) {
+                           for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
+                               @SuppressWarnings("unchecked")
+                               Iterator<File> it = 
FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
+                                       TrueFileFilter.INSTANCE);
+                               while (it.hasNext()) {
+                                   File xmlDocument = it.next();
+                                   if (LOGGER.isLoggable(Level.FINE)) {
+                                       LOGGER.fine("Starting to read XML 
document: " + xmlDocument.getAbsolutePath());
+                                   }
+                                   parser.parseElements(xmlDocument, writer, 
fta, tupleIndex);
+                               }
+                           }
+                       } else {
+                           throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
+                                   + collectionDirectory.getAbsolutePath() + 
") passed to collection.");
+                       }
+                }
+                //check in HDFS file system
+                else
+                {
+                       HDFSFileFunctions hdfs = new 
HDFSFileFunctions(hdfs_conf_dir);
+                       FileSystem fs = hdfs.getFileSystem();
+                       Path directory = new Path(collectionModifiedName);
+                       Path xmlDocument;
+                       try {
+                                               if (fs.exists(directory) && 
fs.isDirectory(directory))
+                                               {
+                                                       for (int tupleIndex = 
0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+                                                       //read directory files 
from HDFS
+                                                       
RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
+                                                   while (it.hasNext())
+                                                   {
+                                                       xmlDocument = 
it.next().getPath();
+                                                       if 
(fs.isFile(xmlDocument))
+                                                       {
+                                                               if 
(LOGGER.isLoggable(Level.FINE)) {
+                                                               
LOGGER.fine("Starting to read XML document: " + xmlDocument.getName());
+                                                           }
+                                                               BufferedReader 
br = new BufferedReader(new InputStreamReader(fs.open(xmlDocument)));
+                                                               
parser.parseElements(br, writer, fta, tupleIndex);
+                                                       }
+                                                   }
+                                                       }
+                                               }
+                                               else
+                                               {
+                                                        throw new 
HyracksDataException("Invalid directory parameter (" + nodeId + ":"
+                                                           + 
collectionDirectory.getAbsolutePath() + ") passed to collection.");
+                                               }
+                                       } catch (FileNotFoundException e) {
+                                               // TODO Auto-generated catch 
block
+                                               System.err.println(e);
+                                       } catch (IOException e) {
+                                               // TODO Auto-generated catch 
block
+                                               System.err.println(e);
+                                       }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/edc79187/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java 
b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
index 23977d5..e3075d8 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
@@ -129,5 +129,27 @@ public class XMLParser {
             throw hde;
         }
     }
+    
+    public void parseElements(BufferedReader br, IFrameWriter writer, 
FrameTupleAccessor fta, int tupleIndex)
+            throws HyracksDataException {
+        try {
+            Reader input;
+            if (bufferSize > 0) {
+                input = br;
+            in.setCharacterStream(input);
+            handler.setupElementWriter(writer, fta, tupleIndex);
+            try {
+                               parser.parse(in);
+                       } catch (SAXException e) {
+                               System.err.println(e);
+                       }
+            input.close();
+            }
+        }  catch (IOException e) {
+            HyracksDataException hde = new HyracksDataException(e);
+            hde.setNodeId(nodeId);
+            throw hde;
+        }
+    }
 
 }
\ No newline at end of file

Reply via email to