changed parameter in parser for hdfs from uri to inputstream

Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/81c90a48
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/81c90a48
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/81c90a48

Branch: refs/heads/steven/hdfs
Commit: 81c90a48ad4980e393497b3ebb8822a3faeb9c16
Parents: d928806
Author: efikalti <[email protected]>
Authored: Thu Jun 11 19:07:05 2015 +0300
Committer: efikalti <[email protected]>
Committed: Thu Jun 11 19:07:05 2015 +0300

----------------------------------------------------------------------
 .../VXQueryCollectionOperatorDescriptor.java    | 11 ++---
 .../runtime/functions/util/FunctionHelper.java  |  7 ++-
 .../org/apache/vxquery/xmlparser/XMLParser.java | 46 ++++++++++++++------
 3 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/81c90a48/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index 0d78cf5..029a857 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -16,11 +16,10 @@
  */
 package org.apache.vxquery.metadata;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
@@ -127,7 +126,7 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                 //else check in HDFS file system
                 else
                 {
-                       System.out.println("searching in hdfs for file : " + 
collectionDirectory.getName());
+                       System.out.println("searching in hdfs for directory : " 
+ collectionDirectory.getName());
                        HDFSFunctions hdfs = new HDFSFunctions();
                        FileSystem fs = hdfs.getFileSystem();
                        if (fs != null)
@@ -148,7 +147,8 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                                                                        if 
(LOGGER.isLoggable(Level.FINE)) {
                                                                        
LOGGER.fine("Starting to read XML document: " + xmlDocument.getName());
                                                                    }
-                                                                       
parser.parseHDFSElements(new URI(xmlDocument.getName()), writer, fta, 
tupleIndex);
+                                                                       
InputStream in = fs.open(xmlDocument).getWrappedStream();
+                                                                       
parser.parseHDFSElements(in, writer, fta, tupleIndex);
                                                                }
                                                            }
                                                                }
@@ -164,9 +164,6 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                                                } catch (IOException e) {
                                                        // TODO Auto-generated 
catch block
                                                        System.err.println(e);
-                                               } catch (URISyntaxException e) {
-                                                       // TODO Auto-generated 
catch block
-                                                       e.printStackTrace();
                                                }
                        }
                 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/81c90a48/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
index 801c249..dd89c5a 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
@@ -21,6 +21,7 @@ import java.io.DataOutput;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
@@ -1210,7 +1211,8 @@ public class FunctionHelper {
                        try {
                                        if (fs.exists(xmlDocument))
                                        {
-                                               parser.parseHDFSDocument(new 
URI(xmlDocument.getName()), abvs);
+                                               InputStream in =  
fs.open(xmlDocument).getWrappedStream();
+                                               parser.parseHDFSDocument( in, 
abvs);
                                        }
                                } catch (FileNotFoundException e) {
                                        // TODO Auto-generated catch block
@@ -1218,9 +1220,6 @@ public class FunctionHelper {
                                } catch (IOException e) {
                                        // TODO Auto-generated catch block
                                        System.err.println(e);
-                               } catch (URISyntaxException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
                                }
                }
         }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/81c90a48/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java 
b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
index 66d2cf7..291017e 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
@@ -19,6 +19,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
 import java.io.Reader;
@@ -29,9 +30,12 @@ import java.util.List;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.vxquery.context.StaticContext;
 import org.apache.vxquery.exceptions.VXQueryFileNotFoundException;
 import org.apache.vxquery.exceptions.VXQueryParseException;
+import org.apache.vxquery.hdfs2.HDFSFunctions;
 import org.apache.vxquery.types.SequenceType;
 import org.xml.sax.InputSource;
 import org.xml.sax.SAXException;
@@ -134,25 +138,41 @@ public class XMLParser {
         }
     }
     
-    public void parseHDFSElements(URI uri, IFrameWriter writer, 
FrameTupleAccessor fta, int tupleIndex) throws IOException
+    public void parseHDFSElements(InputStream inputStream, IFrameWriter 
writer, FrameTupleAccessor fta, int tupleIndex) throws IOException
             {
+       try {
+            Reader input;
+            if (bufferSize > 0) {
+                input = new BufferedReader(new InputStreamReader(inputStream), 
bufferSize);
+            } else {
+                input = new InputStreamReader(inputStream);
+            }
+            in.setCharacterStream(input);
             handler.setupElementWriter(writer, fta, tupleIndex);
-                       try {
-                               parser.parse(uri.toString());
-                       } catch (IOException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       } catch (SAXException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
+            parser.parse(in);
+            input.close();
+        } catch (IOException e) {
+            HyracksDataException hde = new HyracksDataException(e);
+            hde.setNodeId(nodeId);
+            throw hde;
+        } catch (SAXException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
     }
     
-    public void parseHDFSDocument(URI uri, ArrayBackedValueStorage abvs) 
throws HyracksDataException {
+    public void parseHDFSDocument(InputStream inputStream, 
ArrayBackedValueStorage abvs) throws HyracksDataException {
         try {
-               System.out.println("read hdfs document");
-            parser.parse(uri.toString());
+            Reader input;
+            if (bufferSize > 0) {
+                input = new BufferedReader(new InputStreamReader(inputStream), 
bufferSize);
+            } else {
+                input = new InputStreamReader(inputStream);
+            }
+            in.setCharacterStream(input);
+            parser.parse(in);
             handler.writeDocument(abvs);
+            input.close();
         } catch (IOException e) {
             HyracksDataException hde = new HyracksDataException(e);
             hde.setNodeId(nodeId);

Reply via email to