VXQUERY-131: Support for reading HDFS XML files.

1. User can choose between local and HDFS file systems for data input
2. New collection-with-tag function that reads from HDFS by blocks
3. Collection and Document can both read HDFS
4. Custom InputFormatClass for XML in HDFS
5. Parsing of data from HDFS as whole files
6. Unit tests for HDFS
7. MiniDFS cluster for unit tests
8. Documentation
9. Allow user to pass HDFS config folder as a param to queries
Author: Efi Kaltirimidou github: efikalti


Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/3fc2d6c2
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/3fc2d6c2
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/3fc2d6c2

Branch: refs/heads/master
Commit: 3fc2d6c2f74083513f5a45ccdef2bde8ad23d9a8
Parents: 7de1fba
Author: Steven Jacobs <[email protected]>
Authored: Wed May 18 14:56:53 2016 -0700
Committer: Steven Jacobs <[email protected]>
Committed: Wed May 18 14:56:53 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  83 +++-
 src/site/apt/user_cluster_installation.apt      |   2 +-
 src/site/apt/user_query.apt                     |   1 +
 src/site/apt/user_query_hdfs.apt                | 180 ++++++++
 src/site/site.xml                               |   3 +
 vxquery-cli/pom.xml                             |  12 +
 .../java/org/apache/vxquery/cli/VXQuery.java    |  46 +-
 vxquery-core/pom.xml                            |  61 ++-
 .../compiler/rewriter/RewriteRuleset.java       |   2 +-
 .../rewriter/rules/AbstractCollectionRule.java  |  50 ++-
 .../rewriter/rules/IntroduceCollectionRule.java |  11 +-
 .../vxquery/functions/builtin-functions.xml     |   8 +
 .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 428 +++++++++++++++++++
 .../hdfs2/XmlCollectionWithTagInputFormat.java  | 217 ++++++++++
 .../metadata/VXQueryCollectionDataSource.java   |  12 +-
 .../VXQueryCollectionOperatorDescriptor.java    | 190 +++++++-
 .../metadata/VXQueryMetadataProvider.java       |  20 +-
 .../runtime/functions/util/FunctionHelper.java  |  49 ++-
 .../org/apache/vxquery/xmlparser/XMLParser.java |  49 ++-
 .../xmlquery/query/XMLQueryCompiler.java        |  92 ++--
 .../xmlquery/query/SimpleXQueryTest.java        |  19 +-
 .../src/main/resources/conf/cluster.properties  |   6 +-
 vxquery-xtest/pom.xml                           |  15 +
 .../java/org/apache/vxquery/xtest/MiniDFS.java  |  75 ++++
 .../org/apache/vxquery/xtest/TestRunner.java    |  23 +-
 .../org/apache/vxquery/xtest/XTestOptions.java  |   3 +
 .../vxquery/xtest/AbstractXQueryTest.java       |   1 +
 .../org/apache/vxquery/xtest/VXQueryTest.java   |  19 +
 .../HDFS/Aggregate/avgHDFS.txt                  |   1 +
 .../HDFS/Aggregate/countHDFS.txt                |   1 +
 .../HDFS/Aggregate/maxHDFS.txt                  |   1 +
 .../HDFS/Aggregate/maxvalueHDFS.txt             |   1 +
 .../HDFS/Aggregate/minHDFS.txt                  |   1 +
 .../HDFS/Aggregate/sumHDFS.txt                  |   1 +
 .../Queries/XQuery/HDFS/Aggregate/avgHDFS.xq    |  25 ++
 .../Queries/XQuery/HDFS/Aggregate/countHDFS.xq  |  25 ++
 .../Queries/XQuery/HDFS/Aggregate/maxHDFS.xq    |  25 ++
 .../XQuery/HDFS/Aggregate/maxvalueHDFS.xq       |  23 +
 .../Queries/XQuery/HDFS/Aggregate/minHDFS.xq    |  25 ++
 .../Queries/XQuery/HDFS/Aggregate/sumHDFS.xq    |  25 ++
 .../src/test/resources/VXQueryCatalog.xml       |  15 +
 .../test/resources/cat/HDFSAggregateQueries.xml |  53 +++
 .../test/resources/hadoop/conf/core-site.xml    |  34 ++
 .../test/resources/hadoop/conf/hdfs-site.xml    |  34 ++
 .../test/resources/hadoop/conf/mapred-site.xml  |  41 ++
 45 files changed, 1844 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8a97a92..911206d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -169,6 +170,24 @@
             </roles>
             <timezone>-8</timezone>
         </contributor>
+        <contributor>
+            <name>Menaka Madushanka</name>
+            <email />
+            <organization />
+            <roles>
+                <role>Developer</role>
+            </roles>
+            <timezone>+5:30</timezone>
+        </contributor>
+        <contributor>
+            <name>Riyafa Abdul Hameed</name>
+            <email />
+            <organization />
+            <roles>
+                <role>Developer</role>
+            </roles>
+            <timezone>+5:30</timezone>
+        </contributor>
     </contributors>
 
     <mailingLists>
@@ -295,6 +314,25 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-2.x</artifactId>
+                <version>${hyracks.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-core</artifactId>
+                <version>${hyracks.version}</version>
+                <type>jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-hdfs</artifactId>
+                <version>2.7.0</version>
+            </dependency>
+
+            <dependency>
                 <groupId>ant</groupId>
                 <artifactId>ant-trax</artifactId>
                 <version>1.6.5</version>
@@ -340,7 +378,7 @@
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
-                <version>1.3.2</version>
+                <version>2.4</version>
             </dependency>
 
             <dependency>
@@ -359,9 +397,10 @@
             <dependency>
                 <groupId>org.mortbay.jetty</groupId>
                 <artifactId>jetty</artifactId>
-                <version>6.1.4</version>
+                <version>6.1.22</version>
                 <scope>compile</scope>
             </dependency>
+
         </dependencies>
     </dependencyManagement>
 
@@ -437,19 +476,21 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
-        <!-- We override the configuration plugin to override the descriptor 
to use for building
-          the source release zip. Specifically, we would like to control the 
inclusions/exclusions.
-          For example, we exclude the KEYS file from the zip -->
+                <!-- We override the configuration plugin to override the 
descriptor 
+                    to use for building the source release zip. Specifically, 
we would like to 
+                    control the inclusions/exclusions. For example, we exclude 
the KEYS file 
+                    from the zip -->
                 <executions>
                     <execution>
-            <!-- Use this id to match the id mentioned in the assembly plugin 
configuration in
-              the apache parent POM under the apache-release profile -->
+                        <!-- Use this id to match the id mentioned in the 
assembly plugin configuration 
+                            in the apache parent POM under the apache-release 
profile -->
                         <id>source-release-assembly</id>
                         <phase>package</phase>
                         <goals>
                             <goal>single</goal>
                         </goals>
-            <!-- combine.self should be override to replace the configuration 
in the parent POM -->
+                        <!-- combine.self should be override to replace the 
configuration in 
+                            the parent POM -->
                         <configuration combine.self="override">
                             
<runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
                             <descriptors>
@@ -491,6 +532,29 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-site-plugin</artifactId>
             </plugin>
+            <plugin>
+            <groupId>com.google.code.maven-replacer-plugin</groupId>
+            <artifactId>replacer</artifactId>
+            <version>1.5.3</version>
+            <executions>
+                <execution>
+                    <phase>prepare-package</phase>
+                    <goals>
+                        <goal>replace</goal>
+                    </goals>                    
+                </execution>
+            </executions>
+            <configuration>
+                <ignoreMissingFile>true</ignoreMissingFile>
+                
<file>vxquery-server/src/main/resources/conf/cluster.properties</file>
+                <outputFile>
+                    vxquery-server/src/main/resources/conf/cluster.properties
+                </outputFile>
+                <regex>false</regex>
+                <token>$CONF_PATH$</token>
+                
<value>${basedir}/vxquery-xtest/src/test/resources/hadoop/conf</value>
+            </configuration>
+           </plugin>
         </plugins>
     </build>
 
@@ -541,6 +605,7 @@
                     <maxmemory>2g</maxmemory>
                 </configuration>
             </plugin>
+            
         </plugins>
     </reporting>
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/apt/user_cluster_installation.apt
----------------------------------------------------------------------
diff --git a/src/site/apt/user_cluster_installation.apt 
b/src/site/apt/user_cluster_installation.apt
index 0e756b0..19e20ed 100644
--- a/src/site/apt/user_cluster_installation.apt
+++ b/src/site/apt/user_cluster_installation.apt
@@ -32,7 +32,7 @@ Cluster Installation
 
   * Apache VXQuery\x99 source archive (apache-vxquery-X.Y-source-release.zip)
 
-  * JDK >= 1.7
+  * JDK >= 1.8
 
   * Apache Maven >= 3.2
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/apt/user_query.apt
----------------------------------------------------------------------
diff --git a/src/site/apt/user_query.apt b/src/site/apt/user_query.apt
index 8ea6429..c5132c3 100644
--- a/src/site/apt/user_query.apt
+++ b/src/site/apt/user_query.apt
@@ -49,6 +49,7 @@ vxq "path-to"\test.xq
 -showrp                    : Show Runtime plan
 -showtet                   : Show translated expression tree
 -timing                    : Produce timing information
+-hdfs-conf VAL             : The folder containing the HDFS configuration files
 ----------------------------------------
 
 * Java Options

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/apt/user_query_hdfs.apt
----------------------------------------------------------------------
diff --git a/src/site/apt/user_query_hdfs.apt b/src/site/apt/user_query_hdfs.apt
new file mode 100644
index 0000000..fa736b8
--- /dev/null
+++ b/src/site/apt/user_query_hdfs.apt
@@ -0,0 +1,180 @@
+~~ Licensed to the Apache Software Foundation (ASF) under one or more
+~~ contributor license agreements.  See the NOTICE file distributed with
+~~ this work for additional information regarding copyright ownership.
+~~ The ASF licenses this file to You under the Apache License, Version 2.0
+~~ (the "License"); you may not use this file except in compliance with
+~~ the License.  You may obtain a copy of the License at
+~~
+~~     http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License.
+
+Executing a Query in HDFS
+
+
+* 1. Connecting VXQuery with HDFS
+
+  In order to read HDFS data, VXQuery needs access to the HDFS configuration
+  directory, which contains:
+  
+    core-site.xml
+    hdfs-site.xml
+    mapred-site.xml
+    
+  Some systems may automatically set this directory as a system environment
+  variable ("HADOOP_CONF_DIR"). If this is the case, VXQuery will retrieve
+  this automatically when attempting to perform HDFS queries.
+  
+  When this variable is not set, users will need to provide this directory as
+  a Command Line Option when executing VXQuery:
+    -hdfs-conf /path/to/hdfs/conf_folder
+
+
+* 2. Running the Query
+
+
+  For files stored in HDFS there are 2 ways to access them from VXQuery.
+
+
+  [[a]] Reading them as whole files.
+
+
+  [[b]] Reading them block by block.
+
+
+** a. Reading them as whole files.
+
+  For this option you only need to change the path to files. To define that 
your 
+  file(s) exist and should be read from HDFS you must add <"hdfs:/"> in front 
+  of the path. VXQuery will read the path of the files you request in your 
query 
+  and try to locate them.
+
+
+  So in order to run a query that will read the input files from HDFS you need 
+  to make sure that
+
+
+  a) The environmental variable is set for "HADOOP_CONF_DIR" or you pass the 
+  directory location using -hdfs-conf
+
+
+  b) The path defined in your query begins with <hdfs://> and the full path to 
+  the file(s).
+
+
+  c) The path exists on HDFS and the user that runs the query has read 
permission 
+  to these files.
+
+
+*** Example
+
+  I want to find all the <books> that are published after 2004.
+
+
+  The file is located in HDFS in this path </user/hduser/store/books.xml>
+
+
+  My query will look like this:
+
+
+----------
+for $x in collection("hdfs://user/hduser/store")
+where $x/year>2004
+return $x/title
+----------
+
+
+  If I want only one file, the <<books.xml>> to be parsed from HDFS, my query 
will 
+  look like this:
+
+
+----------
+for $x in doc("hdfs://user/hduser/store/books.xml")
+where $x/year>2004
+return $x/title
+----------
+
+
+** b. Reading them block by block
+
+
+  In order to use that option you need to modify your query. Instead of using 
the 
+  <collection> or <doc> function to define your input file(s) you need to use 
+  <collection-with-tag>.
+
+
+  <collection-with-tag> accepts two arguments, one is the path to the HDFS 
directory 
+  you have stored your input files, and the second is a specific <<tag>> that 
exists 
+  in the input file(s). This is the tag of the element that contains the 
fields that 
+  your query is looking for.
+
+  Other than these arguments, you do not need to change anything else in the 
query.
+  
+  Note: since this strategy is optimized to read block by block, the result 
will 
+  include all elements with the given tag, regardless of depth within the xml 
tree.
+
+
+*** Example
+  
+  The same example, using <<collection-with-tag>>.
+
+  My input file <books.xml>:
+
+-----------------------------
+<?xml version="1.0" encoding="UTF-8"?>
+<bookstore>
+
+<book>
+  <title lang="en">Everyday Italian</title>
+  <author>Giada De Laurentiis</author>
+  <year>2005</year>
+  <price>30.00</price>
+</book>
+
+<book>
+  <title lang="en">Harry Potter</title>
+  <author>J K. Rowling</author>
+  <year>2005</year>
+  <price>29.99</price>
+</book>
+
+<book>
+  <title lang="en">XQuery Kick Start</title>
+  <author>James McGovern</author>
+  <author>Per Bothner</author>
+  <author>Kurt Cagle</author>
+  <author>James Linn</author>
+  <author>Vaidyanathan Nagarajan</author>
+  <year>2003</year>
+  <price>49.99</price>
+</book>
+
+<book>
+  <title lang="en">Learning XML</title>
+  <author>Erik T. Ray</author>
+  <year>2003</year>
+  <price>39.95</price>
+</book>
+
+</bookstore>
+----------------------------
+
+
+  My query will look like this:
+
+
+----------------------------
+for $x in collection-with-tag("hdfs://user/hduser/store","book")/book
+where $x/year>2004
+return $x/title
+----------------------------
+
+
+  Take notice that I defined the path to the directory containing the file(s) 
+  and not the file, <collection-with-tag> expects the path to the directory. I 
also
+  added the </book> after the function. This is also needed, like <collection> 
and
+  <doc> functions, for the query to be parsed correctly.

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 5640976..d64fe0d 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -64,6 +64,9 @@ limitations under the License.
                 name="Executing a Query"
                 href="user_query.html" />
             <item
+                name="Using HDFS with VXQuery"
+                href="user_query_hdfs.html" />
+            <item
                 name="Running the Test Suite"
                 href="user_running_tests.html" />
         </menu>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-cli/pom.xml
----------------------------------------------------------------------
diff --git a/vxquery-cli/pom.xml b/vxquery-cli/pom.xml
index 7bf1ff9..c4e11cb 100644
--- a/vxquery-cli/pom.xml
+++ b/vxquery-cli/pom.xml
@@ -126,7 +126,19 @@
             <groupId>org.apache.hyracks</groupId>
             <artifactId>hyracks-dataflow-std</artifactId>
         </dependency>
+        
+        <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-core</artifactId>
+        </dependency>
+        
+        <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-2.x</artifactId>
+        </dependency>
     </dependencies>
+    
+    
 
     <reporting>
         <plugins>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
----------------------------------------------------------------------
diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java 
b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
index a02c65d..17287c6 100644
--- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
+++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
@@ -81,7 +81,8 @@ public class VXQuery {
     /**
      * Constructor to use command line options passed.
      *
-     * @param opts Command line options object
+     * @param opts
+     *            Command line options object
      */
     public VXQuery(CmdLineOptions opts) {
         this.opts = opts;
@@ -181,8 +182,13 @@ public class VXQuery {
                     opts.showOET, opts.showRP);
 
             start = opts.timing ? new Date() : null;
-            XMLQueryCompiler compiler = new XMLQueryCompiler(listener, 
getNodeList(), opts.frameSize,
-                    opts.availableProcessors, opts.joinHashSize, 
opts.maximumDataSize);
+
+            Map<String, NodeControllerInfo> nodeControllerInfos = null;
+            if (hcc != null) {
+                nodeControllerInfos = hcc.getNodeControllerInfos();
+            }
+            XMLQueryCompiler compiler = new XMLQueryCompiler(listener, 
nodeControllerInfos, opts.frameSize,
+                    opts.availableProcessors, opts.joinHashSize, 
opts.maximumDataSize, opts.hdfsConf);
             resultSetId = createResultSetId();
             CompilerControlBlock ccb = new CompilerControlBlock(new 
StaticContextImpl(RootStaticContextImpl.INSTANCE),
                     resultSetId, null);
@@ -233,30 +239,13 @@ public class VXQuery {
     }
 
     /**
-     * Get cluster node configuration.
-     *
-     * @return Configuration of node controllers as array of Strings.
-     * @throws Exception
-     */
-    private String[] getNodeList() throws Exception {
-        if (hcc != null) {
-            Map<String, NodeControllerInfo> nodeControllerInfos = 
hcc.getNodeControllerInfos();
-            String[] nodeList = new String[nodeControllerInfos.size()];
-            int index = 0;
-            for (String node : nodeControllerInfos.keySet()) {
-                nodeList[index++] = node;
-            }
-            return nodeList;
-        }
-        return new String[0];
-    }
-
-    /**
      * Creates a Hyracks dataset, if not already existing with the job frame 
size, and 1 reader. Allocates a new buffer of size specified in the frame of 
Hyracks
      * node. Creates new dataset reader with the current job ID and result set 
ID. Outputs the string in buffer for each frame.
      *
-     * @param spec   JobSpecification object, containing frame size. Current 
specified job.
-     * @param writer Writer for output of job.
+     * @param spec
+     *            JobSpecification object, containing frame size. Current 
specified job.
+     * @param writer
+     *            Writer for output of job.
      * @throws Exception
      */
     private void runJob(JobSpecification spec, PrintWriter writer) throws 
Exception {
@@ -339,7 +328,8 @@ public class VXQuery {
     /**
      * Reads the contents of file given in query into a String. The file is 
always closed. For XML files UTF-8 encoding is used.
      *
-     * @param query The query with filename to be processed
+     * @param query
+     *            The query with filename to be processed
      * @return UTF-8 formatted query string
      * @throws IOException
      */
@@ -361,8 +351,7 @@ public class VXQuery {
      * Helper class with fields and methods to handle all command line options
      */
     private static class CmdLineOptions {
-        @Option(name = "-available-processors",
-                usage = "Number of available processors. (default: java's 
available processors)")
+        @Option(name = "-available-processors", usage = "Number of available 
processors. (default: java's available processors)")
         private int availableProcessors = -1;
 
         @Option(name = "-client-net-ip-address", usage = "IP Address of the 
ClusterController.")
@@ -422,6 +411,9 @@ public class VXQuery {
         @Option(name = "-x", usage = "Bind an external variable")
         private Map<String, String> bindings = new HashMap<String, String>();
 
+        @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop 
configuration files")
+        private String hdfsConf = null;
+
         @Argument
         private List<String> arguments = new ArrayList<String>();
     }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/pom.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml
index 59cc987..d244818 100644
--- a/vxquery-core/pom.xml
+++ b/vxquery-core/pom.xml
@@ -14,7 +14,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -63,7 +64,8 @@
                         <configuration>
                             <target>
                                 <ant antfile="build-xslt.xml" target="build">
-                                    <property name="build.gen-src.dir" 
value="${project.build.directory}/generated-sources/main/java" />
+                                    <property name="build.gen-src.dir"
+                                        
value="${project.build.directory}/generated-sources/main/java" />
                                     <property name="src.code.dir" 
value="${basedir}/src/main/java" />
                                     <property name="xslt.dir" 
value="${basedir}/src/main/xslt" />
                                     <property name="classpath.xslt" 
refid="maven.compile.classpath" />
@@ -80,7 +82,8 @@
                         <configuration>
                             <target>
                                 <ant antfile="build-site.xml" target="build">
-                                    <property name="build.gen-site.dir" 
value="${project.build.directory}/generated-site/apt" />
+                                    <property name="build.gen-site.dir"
+                                        
value="${project.build.directory}/generated-site/apt" />
                                     <property name="src.code.dir" 
value="${basedir}/src/main/java" />
                                     <property name="xslt.dir" 
value="${basedir}/src/main/xslt" />
                                     <property name="classpath.xslt" 
refid="maven.compile.classpath" />
@@ -121,16 +124,13 @@
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
                 </configuration>
             </plugin>
-      <!--
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-site-plugin</artifactId>
-      </plugin>
-      -->
+            <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> 
<artifactId>maven-site-plugin</artifactId> 
+                </plugin> -->
         </plugins>
         <pluginManagement>
             <plugins>
-        <!--This plugin's configuration is used to store Eclipse m2e settings 
only. It has no influence on the Maven build itself.-->
+                <!--This plugin's configuration is used to store Eclipse m2e 
settings 
+                    only. It has no influence on the Maven build itself. -->
                 <plugin>
                     <groupId>org.eclipse.m2e</groupId>
                     <artifactId>lifecycle-mapping</artifactId>
@@ -209,6 +209,16 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-hdfs-2.x</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-hdfs-core</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>ant</groupId>
             <artifactId>ant-trax</artifactId>
             <scope>provided</scope>
@@ -259,6 +269,21 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
     </dependencies>
 
     <reporting>
@@ -290,18 +315,10 @@
                     
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
                 </configuration>
             </plugin>
-      <!--
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <version>2.5.2</version>
-        <configuration>
-          <effort>Min</effort>
-          <threshold>Normal</threshold>
-          <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile>
-        </configuration>
-      </plugin>
-      -->
+            <!-- <plugin> <groupId>org.codehaus.mojo</groupId> 
<artifactId>findbugs-maven-plugin</artifactId> 
+                <version>2.5.2</version> <configuration> <effort>Min</effort> 
<threshold>Normal</threshold> 
+                <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile> 
</configuration> 
+                </plugin> -->
             <plugin>
                 <groupId>org.apache.rat</groupId>
                 <artifactId>apache-rat-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
index fd20465..205e0b2 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
@@ -19,8 +19,8 @@ package org.apache.vxquery.compiler.rewriter;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild;
 import 
org.apache.vxquery.compiler.rewriter.rules.ConsolidateAssignAggregateRule;
+import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild;
 import org.apache.vxquery.compiler.rewriter.rules.ConvertAssignToUnnestRule;
 import 
org.apache.vxquery.compiler.rewriter.rules.ConvertFromAlgebricksExpressionsRule;
 import 
org.apache.vxquery.compiler.rewriter.rules.ConvertToAlgebricksExpressionsRule;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
index 53011d2..74220da 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
@@ -22,16 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
-import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.values.ValueTag;
-import org.apache.vxquery.functions.BuiltinFunctions;
-import org.apache.vxquery.types.BuiltinTypeRegistry;
-import org.apache.vxquery.types.Quantifier;
-import org.apache.vxquery.types.SequenceType;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -46,6 +36,14 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
+import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.types.BuiltinTypeRegistry;
+import org.apache.vxquery.types.Quantifier;
+import org.apache.vxquery.types.SequenceType;
 
 public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule {
     final ByteBufferInputStream bbis = new ByteBufferInputStream();
@@ -54,14 +52,13 @@ public abstract class AbstractCollectionRule implements 
IAlgebraicRewriteRule {
     final TaggedValuePointable tvp = (TaggedValuePointable) 
TaggedValuePointable.FACTORY.createPointable();
 
     /**
-     * Get the constant value for the collection. Return null for not a 
collection.
+     * Get the arguments for the collection and collection-with-tag. Return 
null for not a collection.
      *
      * @param opRef
      *            Logical operator
      * @return collection name
      */
-    protected String getCollectionName(Mutable<ILogicalOperator> opRef) {
-        VXQueryConstantValue constantValue;
+    protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) {
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
@@ -82,13 +79,32 @@ public abstract class AbstractCollectionRule implements 
IAlgebraicRewriteRule {
             return null;
         }
         AbstractFunctionCallExpression functionCall = 
(AbstractFunctionCallExpression) logicalExpression;
-        if 
(!functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier()))
 {
+        if (!functionCall.getFunctionIdentifier()
+                
.equals(BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier())
+                && !functionCall.getFunctionIdentifier()
+                        
.equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) {
             return null;
         }
 
-        ILogicalExpression logicalExpression2 = (ILogicalExpression) 
functionCall.getArguments().get(0).getValue();
+        // Get arguments
+        int size = functionCall.getArguments().size();
+        if (size > 0) {
+            String args[] = new String[size];
+            for (int i = 0; i < size; i++) {
+                args[i] = getArgument(functionCall, opRef, i);
+            }
+            return args;
+        }
+        return null;
+    }
+
+    private String getArgument(AbstractFunctionCallExpression functionCall, 
Mutable<ILogicalOperator> opRef, int pos) {
+        VXQueryConstantValue constantValue;
+        ILogicalExpression logicalExpression2 = (ILogicalExpression) 
functionCall.getArguments().get(pos).getValue();
         if (logicalExpression2.getExpressionTag() != 
LogicalExpressionTag.VARIABLE) {
             return null;
+        } else if (logicalExpression2 == null) {
+            return null;
         }
         VariableReferenceExpression vre = (VariableReferenceExpression) 
logicalExpression2;
         Mutable<ILogicalOperator> opRef3 = 
OperatorToolbox.findProducerOf(opRef, vre.getVariableReference());
@@ -111,7 +127,6 @@ public abstract class AbstractCollectionRule implements 
IAlgebraicRewriteRule {
         } else {
             return null;
         }
-
         // Constant value is now in a TaggedValuePointable. Convert the value 
into a java String.
         tvp.set(constantValue.getValue(), 0, constantValue.getValue().length);
         String collectionName = null;
@@ -121,11 +136,12 @@ public abstract class AbstractCollectionRule implements 
IAlgebraicRewriteRule {
                 
bbis.setByteBuffer(ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(), 
stringp.getStartOffset(),
                         stringp.getLength() + stringp.getStartOffset())), 0);
                 collectionName = di.readUTF();
+                return collectionName;
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
-        return collectionName;
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
index cc857a1..8ed8bb1 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
@@ -64,9 +64,10 @@ public class IntroduceCollectionRule extends 
AbstractCollectionRule {
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
         VXQueryOptimizationContext vxqueryContext = 
(VXQueryOptimizationContext) context;
-        String collectionName = getCollectionName(opRef);
+        String args[] = getCollectionName(opRef);
 
-        if (collectionName != null) {
+        if (args != null) {
+            String collectionName = args[0];
             // Build the new operator and update the query plan.
             int collectionId = vxqueryContext.newCollectionId();
             VXQueryCollectionDataSource ds = 
VXQueryCollectionDataSource.create(collectionId, collectionName,
@@ -74,6 +75,12 @@ public class IntroduceCollectionRule extends 
AbstractCollectionRule {
             if (ds != null) {
                 ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
 
+                // Check if the call is for collection-with-tag
+                if (args.length == 2) {
+                    
ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
+                    ds.setTag(args[1]);
+                }
+
                 // Known to be true because of collection name.
                 AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
                 UnnestOperator unnest = (UnnestOperator) op;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index 38f03a4..3b9371d 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -128,6 +128,14 @@
         <!-- Collection operator is added during the rewrite rules phase.  -->
     </function>
     
+    <!-- fn:collection-with-tag($arg1  as xs:string?, $arg2 as xs:string?) as  
node()* -->
+    <function name="fn:collection-with-tag">
+        <param name="arg1" type="xs:string?"/>
+        <param name="arg2" type="xs:string?"/>
+        <return type="node()*"/>
+        <!-- CollectionWithTag operator is added during the rewrite rules 
phase.  -->
+    </function>
+    
     <!-- fn:compare($comparand1  as xs:string?, $comparand2 as xs:string?)  as 
xs:integer?  -->
     <function name="fn:compare">
         <param name="comparand1" type="xs:string?"/>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
new file mode 100644
index 0000000..dcbfe94
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+public class HDFSFunctions {
+
+    private Configuration conf;
+    private FileSystem fs;
+    private String conf_path;
+    private Job job;
+    private InputFormat inputFormat;
+    private List<InputSplit> splits;
+    private ArrayList<ArrayList<String>> nodes;
+    private HashMap<Integer, String> schedule;
+    private final String TEMP = "java.io.tmpdir";
+    private final String dfs_path = "vxquery_splits_schedule.txt";
+    private final String filepath = System.getProperty(TEMP) + 
"splits_schedule.txt";
+    protected static final Logger LOGGER = 
Logger.getLogger(HDFSFunctions.class.getName());
+    private final Map<String, NodeControllerInfo> nodeControllerInfos;
+
+    /**
+     * Create the configuration and add the paths for core-site and hdfs-site 
as resources.
+     * Initialize an instance of HDFS FileSystem for this configuration.
+     * 
+     * @param nodeControllerInfos
+     * @param hdfsConf
+     */
+    public HDFSFunctions(Map<String, NodeControllerInfo> nodeControllerInfos, 
String hdfsConf) {
+        this.conf = new Configuration();
+        this.nodeControllerInfos = nodeControllerInfos;
+        this.conf_path = hdfsConf;
+    }
+
+    /**
+     * Create the needed objects for reading the splits of the filepath given 
as argument.
+     * This method should run before the scheduleSplits method.
+     * 
+     * @param filepath
+     */
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    public void setJob(String filepath, String tag) {
+        try {
+            conf.set("start_tag", "<" + tag + ">");
+            conf.set("end_tag", "</" + tag + ">");
+            job = new Job(conf, "Read from HDFS");
+            Path input = new Path(filepath);
+            FileInputFormat.addInputPath(job, input);
+            job.setInputFormatClass(XmlCollectionWithTagInputFormat.class);
+            inputFormat = 
ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+            splits = inputFormat.getSplits(job);
+        } catch (IOException | ClassNotFoundException | InterruptedException 
e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Returns true if the file path exists or it is located somewhere in the 
home directory of the user that called the function.
+     * Searches in subdirectories of the home directory too.
+     * 
+     * @param filename
+     * @return
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    public boolean isLocatedInHDFS(String filename) throws 
IllegalArgumentException, IOException {
+        //search file path
+        if (fs.exists(new Path(filename))) {
+            return true;
+        }
+        return searchInDirectory(fs.getHomeDirectory(), filename) != null;
+    }
+
+    /**
+     * Searches the given directory for the file.
+     * 
+     * @param directory
+     *            to search
+     * @param filename
+     *            of file we want
+     * @return path if file exists in this directory.else return null.
+     */
+    public Path searchInDirectory(Path directory, String filename) {
+        //Search the files and folder in this Path to find the one matching 
the filename.
+        try {
+            RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, 
true);
+            String[] parts;
+            Path path;
+            while (it.hasNext()) {
+                path = it.next().getPath();
+                parts = path.toString().split("/");
+                if (parts[parts.length - 1].equals(filename)) {
+                    return path;
+                }
+            }
+        } catch (IOException e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Read the cluster properties file and locate the HDFS_CONF variable that 
is the directory path for the
+     * hdfs configuration if the system environment variable HDFS_CONF is not 
set.
+     * 
+     * @return true if is successfully finds the Hadoop/HDFS home directory
+     */
+    private boolean locateConf() {
+        if (this.conf_path == null) {
+            //As a last resort, try getting the configuration from the system 
environment
+            //Some systems won't have this set.
+            this.conf_path = System.getenv("HADOOP_CONF_DIR");
+        }
+        return this.conf_path != null;
+    }
+
+    /**
+     * Upload a file/directory to HDFS.Filepath is the path in the local file 
system.dir is the destination path.
+     * 
+     * @param filepath
+     * @param dir
+     * @return
+     */
+    public boolean put(String filepath, String dir) {
+        if (this.fs != null) {
+            Path path = new Path(filepath);
+            Path dest = new Path(dir);
+            try {
+                if (fs.exists(dest)) {
+                    fs.delete(dest, true); //recursive delete
+                }
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(e.getMessage());
+                }
+            }
+            try {
+                fs.copyFromLocalFile(path, dest);
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(e.getMessage());
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Get instance of the HDFSfile system if it is configured correctly.
+     * Return null if there is no instance.
+     * 
+     * @return
+     */
+    public FileSystem getFileSystem() {
+        if (locateConf()) {
+            conf.addResource(new Path(this.conf_path + "/core-site.xml"));
+            conf.addResource(new Path(this.conf_path + "/hdfs-site.xml"));
+            try {
+                fs = FileSystem.get(conf);
+                return this.fs;
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(e.getMessage());
+                }
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe("Could not locate HDFS configuration folder.");
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Create a HashMap that has as key the hostname and values the splits 
that belong to this hostname;
+     * 
+     * @return
+     * @throws IOException
+     */
+    public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws 
IOException {
+        HashMap<String, ArrayList<Integer>> splits_map = new HashMap<String, 
ArrayList<Integer>>();
+        ArrayList<Integer> temp;
+        int i = 0;
+        String hostname;
+        for (InputSplit s : this.splits) {
+            SplitLocationInfo info[] = s.getLocationInfo();
+            hostname = info[0].getLocation();
+            if (splits_map.containsKey(hostname)) {
+                temp = splits_map.get(hostname);
+                temp.add(i);
+            } else {
+                temp = new ArrayList<Integer>();
+                temp.add(i);
+                splits_map.put(hostname, temp);
+            }
+            i++;
+        }
+
+        return splits_map;
+    }
+
+    public void scheduleSplits() throws IOException, 
ParserConfigurationException, SAXException {
+        schedule = new HashMap<Integer, String>();
+        ArrayList<String> empty = new ArrayList<String>();
+        HashMap<String, ArrayList<Integer>> splits_map = 
this.getLocationsOfSplits();
+        readNodesFromXML();
+        int count = this.splits.size();
+
+        ArrayList<Integer> splits;
+        String node;
+        for (ArrayList<String> info : this.nodes) {
+            node = info.get(1);
+            if (splits_map.containsKey(node)) {
+                splits = splits_map.get(node);
+                for (Integer split : splits) {
+                    schedule.put(split, node);
+                    count--;
+                }
+                splits_map.remove(node);
+            } else {
+                empty.add(node);
+            }
+        }
+
+        //Check if every split got assigned to a node
+        if (count != 0) {
+            ArrayList<Integer> remaining = new ArrayList<Integer>();
+            // Find remaining splits
+            for (InputSplit s : this.splits) {
+                int i = 0;
+                if (!schedule.containsKey(i)) {
+                    remaining.add(i);
+                }
+            }
+
+            if (empty.size() != 0) {
+                int node_number = 0;
+                for (int split : remaining) {
+                    if (node_number == empty.size()) {
+                        node_number = 0;
+                    }
+                    schedule.put(split, empty.get(node_number));
+                    node_number++;
+                }
+            }
+        }
+    }
+
+    /**
+     * Read the hostname and the ip address of every node from the xml cluster 
configuration file.
+     * Save the information inside nodes.
+     * 
+     * @throws ParserConfigurationException
+     * @throws IOException
+     * @throws SAXException
+     */
+    public void readNodesFromXML() throws ParserConfigurationException, 
SAXException, IOException {
+        DocumentBuilderFactory dbFactory = 
DocumentBuilderFactory.newInstance();
+        DocumentBuilder dBuilder;
+        dBuilder = dbFactory.newDocumentBuilder();
+        nodes = new ArrayList<ArrayList<String>>();
+        for (NodeControllerInfo ncInfo : nodeControllerInfos.values()) {
+            //Will this include the master node? Is that bad?
+            ArrayList<String> info = new ArrayList<String>();
+            info.add(ncInfo.getNodeId());
+            info.add(ncInfo.getNetworkAddress().getAddress());
+            nodes.add(info);
+        }
+    }
+
+    /**
+     * Writes the schedule to a temporary file, then uploads the file to the 
HDFS.
+     * 
+     * @throws UnsupportedEncodingException
+     * @throws FileNotFoundException
+     */
+    public void addScheduleToDistributedCache() throws FileNotFoundException, 
UnsupportedEncodingException {
+        PrintWriter writer;
+        writer = new PrintWriter(filepath, "UTF-8");
+        for (int split : this.schedule.keySet()) {
+            writer.write(split + "," + this.schedule.get(split));
+        }
+        writer.close();
+        // Add file to HDFS
+        this.put(filepath, dfs_path);
+    }
+
+    public RecordReader getReader() {
+
+        List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+        for (int i = 0; i < splits.size(); i++) {
+            fileSplits.add((FileSplit) splits.get(i));
+        }
+        FileSplitsFactory splitsFactory;
+        try {
+            splitsFactory = new FileSplitsFactory(fileSplits);
+            List<FileSplit> inputSplits = splitsFactory.getSplits();
+            ContextFactory ctxFactory = new ContextFactory();
+            int size = inputSplits.size();
+            for (int i = 0; i < size; i++) {
+                /**
+                 * read the split
+                 */
+                TaskAttemptContext context;
+                try {
+                    context = ctxFactory.createContext(job.getConfiguration(), 
i);
+                    RecordReader reader = 
inputFormat.createRecordReader(inputSplits.get(i), context);
+                    reader.initialize(inputSplits.get(i), context);
+                    return reader;
+                } catch (IOException | InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe(e.getMessage());
+                    }
+                }
+            }
+        } catch (HyracksDataException e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @return schedule.
+     */
+    public HashMap<Integer, String> getSchedule() {
+        return this.schedule;
+    }
+
+    /**
+     * Return the splits belonging to this node for the existing schedule.
+     * 
+     * @param node
+     * @return
+     */
+    public ArrayList<Integer> getScheduleForNode(String node) {
+        ArrayList<Integer> node_schedule = new ArrayList<Integer>();
+        for (int split : this.schedule.keySet()) {
+            if (node.equals(this.schedule.get(split))) {
+                node_schedule.add(split);
+            }
+        }
+        return node_schedule;
+    }
+
+    public List<InputSplit> getSplits() {
+        return this.splits;
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public InputFormat getinputFormat() {
+        return this.inputFormat;
+    }
+
+    public Document convertStringToDocument(String xmlStr) {
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder builder;
+        try {
+            builder = factory.newDocumentBuilder();
+            Document doc = builder.parse(new InputSource(new 
StringReader(xmlStr)));
+            return doc;
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
new file mode 100644
index 0000000..1d053b6
--- /dev/null
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlCollectionWithTagInputFormat extends TextInputFormat {
+
+    public static String STARTING_TAG;
+    public static String ENDING_TAG;
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(InputSplit 
split, TaskAttemptContext context) {
+        try {
+            STARTING_TAG = context.getConfiguration().get("start_tag");
+            ENDING_TAG = context.getConfiguration().get("end_tag");
+            return new XmlRecordReader((FileSplit) split, 
context.getConfiguration());
+        } catch (IOException ioe) {
+            return null;
+        }
+    }
+
+    /**
+     * XMLRecordReader class to read through a given xml document to output 
xml blocks as records as specified
+     * by the end tag
+     */
+    public static class XmlRecordReader extends RecordReader<LongWritable, 
Text> {
+
+        private final byte[] end_tag;
+        private final byte[] start_tag;
+        private final long start;
+        private final long end;
+        private final FSDataInputStream fsin;
+        private final DataOutputBuffer buffer = new DataOutputBuffer();
+        private LongWritable currentKey;
+        private Text currentValue;
+        BlockLocation[] blocks;
+        public static byte[] nl = "\n".getBytes();
+
+        public XmlRecordReader(FileSplit split, Configuration conf) throws 
IOException {
+            end_tag = ENDING_TAG.getBytes(Charsets.UTF_8);
+            start_tag = STARTING_TAG.getBytes(Charsets.UTF_8);
+
+            // open the file and seek to the start of the split
+            start = split.getStart();
+            // set the end of the file
+            end = start + split.getLength();
+            Path file = split.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            FileStatus fStatus = fs.getFileStatus(file);
+            blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
+            // seek the start of file
+            fsin = fs.open(split.getPath());
+            fsin.seek(start);
+        }
+
+        /**
+         * Get next block item
+         * 
+         * @param key
+         * @param value
+         * @return
+         * @throws IOException
+         */
+        private boolean next(LongWritable key, Text value) throws IOException {
+            if (fsin.getPos() < end) {
+                try {
+                    if (readBlock(true)) {
+                        key.set(fsin.getPos());
+                        value.set(buffer.getData(), 0, buffer.getLength());
+                        return true;
+                    }
+                } finally {
+                    buffer.reset();
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+            Closeables.close(fsin, true);
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+            return (fsin.getPos() - start) / (float) (end - start);
+        }
+
+        /**
+         * Read the block from start till end and after that until you find a 
closing tag
+         * 
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readBlock(boolean withinBlock) throws IOException {
+            boolean read = false;
+
+            while (true) {
+                if (fsin.getPos() < end) {
+                    if (readUntilMatch(start_tag, false)) {
+                        buffer.write(start_tag);
+                        readUntilMatch(end_tag, true);
+                        read = true;
+                    }
+                } else {
+                    return read;
+                }
+            }
+        }
+
+        /**
+         * Read from block(s) until you reach the end of file or find a 
matching bytes with match[]
+         * 
+         * @param match
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readUntilMatch(byte[] match, boolean withinBlock) 
throws IOException {
+            int i = 0;
+            while (true) {
+                int b = fsin.read();
+                // end of file:
+                if (b == -1) {
+                    return false;
+                }
+                // save to buffer:
+                if (withinBlock) {
+                    buffer.write(b);
+                }
+
+                // check if we're matching:
+                if (b == match[i]) {
+                    i++;
+                    if (i >= match.length) {
+                        return true;
+                    }
+                } else {
+                    i = 0;
+                }
+                // see if we've passed the stop point:
+                if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+                    return false;
+                }
+            }
+        }
+
+        private int nextBlock() throws IOException {
+            long pos = fsin.getPos();
+            long block_length;
+            for (int i = 0; i < blocks.length; i++) {
+                block_length = blocks[i].getOffset() + blocks[i].getLength();
+                if (pos == block_length) {
+                    return i + 1;
+                }
+            }
+            return 0;
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
+            return currentKey;
+        }
+
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException 
{
+            return currentValue;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            currentKey = new LongWritable();
+            currentValue = new Text();
+            return next(currentKey, currentValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index 62d1ca7..3c2d6aa 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -37,6 +37,7 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
     private String[] collectionPartitions;
     private final List<Integer> childSeq;
     private int totalDataSources;
+    private String tag;
 
     private final Object[] types;
 
@@ -60,6 +61,7 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
             }
         };
         this.childSeq = new ArrayList<Integer>();
+        this.tag = null;
     }
 
     public int getTotalDataSources() {
@@ -77,7 +79,7 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
     public String[] getPartitions() {
         return collectionPartitions;
     }
-    
+
     public void setPartitions(String[] collectionPartitions) {
         this.collectionPartitions = collectionPartitions;
     }
@@ -86,6 +88,14 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
         return collectionPartitions.length;
     }
 
+    public String getTag() {
+        return this.tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
     @Override
     public String getId() {
         return collectionName;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index d5966b8..b8dca63 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -16,15 +16,37 @@
  */
 package org.apache.vxquery.metadata;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameFieldAppender;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -38,10 +60,14 @@ import 
org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
 import org.apache.vxquery.context.DynamicContext;
+import org.apache.vxquery.hdfs2.HDFSFunctions;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.XMLParser;
+import org.xml.sax.SAXException;
 
 public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -50,15 +76,23 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
     private String[] collectionPartitions;
     private List<Integer> childSeq;
     protected static final Logger LOGGER = 
Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
+    private HDFSFunctions hdfs;
+    private String tag;
+    private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" 
standalone=\"yes\"?>\n";
+    private final String hdfsConf;
+    private final Map<String, NodeControllerInfo> nodeControllerInfos;
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry 
spec, VXQueryCollectionDataSource ds,
-            RecordDescriptor rDesc) {
+            RecordDescriptor rDesc, String hdfsConf, Map<String, 
NodeControllerInfo> nodeControllerInfos) {
         super(spec, 1, 1);
         collectionPartitions = ds.getPartitions();
         dataSourceId = (short) ds.getDataSourceId();
         totalDataSources = (short) ds.getTotalDataSources();
         childSeq = ds.getChildSeq();
         recordDescriptors[0] = rDesc;
+        this.tag = ds.getTag();
+        this.hdfsConf = hdfsConf;
+        this.nodeControllerInfos = nodeControllerInfos;
     }
 
     @Override
@@ -83,31 +117,155 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
             public void open() throws HyracksDataException {
                 appender.reset(frame, true);
                 writer.open();
+                hdfs = new HDFSFunctions(nodeControllerInfos, hdfsConf);
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 fta.reset(buffer);
                 String collectionModifiedName = 
collectionName.replace("${nodeId}", nodeId);
-                File collectionDirectory = new File(collectionModifiedName);
-
-                // Go through each tuple.
-                if (collectionDirectory.isDirectory()) {
-                    for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); 
++tupleIndex) {
-                        @SuppressWarnings("unchecked")
-                        Iterator<File> it = 
FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
-                                TrueFileFilter.INSTANCE);
-                        while (it.hasNext()) {
-                            File xmlDocument = it.next();
-                            if (LOGGER.isLoggable(Level.FINE)) {
-                                LOGGER.fine("Starting to read XML document: " 
+ xmlDocument.getAbsolutePath());
+                if (!collectionModifiedName.contains("hdfs:/")) {
+                    File collectionDirectory = new 
File(collectionModifiedName);
+                    //check if directory is in the local file system
+                    if (collectionDirectory.exists()) {
+                        // Go through each tuple.
+                        if (collectionDirectory.isDirectory()) {
+                            for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
+                                Iterator<File> it = 
FileUtils.iterateFiles(collectionDirectory,
+                                        new VXQueryIOFileFilter(), 
TrueFileFilter.INSTANCE);
+                                while (it.hasNext()) {
+                                    File xmlDocument = it.next();
+                                    if (LOGGER.isLoggable(Level.FINE)) {
+                                        LOGGER.fine("Starting to read XML 
document: " + xmlDocument.getAbsolutePath());
+                                    }
+                                    parser.parseElements(xmlDocument, writer, 
tupleIndex);
+                                }
                             }
-                            parser.parseElements(xmlDocument, writer, 
tupleIndex);
+                        } else {
+                            throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
+                                    + collectionDirectory.getAbsolutePath() + 
") passed to collection.");
                         }
                     }
                 } else {
-                    throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
-                            + collectionDirectory.getAbsolutePath() + ") 
passed to collection.");
+                    // Else check in HDFS file system
+                    // Get instance of the HDFS filesystem
+                    FileSystem fs = hdfs.getFileSystem();
+                    if (fs != null) {
+                        collectionModifiedName = 
collectionModifiedName.replaceAll("hdfs:/", "");
+                        Path directory = new Path(collectionModifiedName);
+                        Path xmlDocument;
+                        if (tag != null) {
+                            hdfs.setJob(directory.toString(), tag);
+                            tag = "<" + tag + ">";
+                            Job job = hdfs.getJob();
+                            InputFormat inputFormat = hdfs.getinputFormat();
+                            try {
+                                hdfs.scheduleSplits();
+                                ArrayList<Integer> schedule = hdfs
+                                        
.getScheduleForNode(InetAddress.getLocalHost().getHostAddress());
+                                List<InputSplit> splits = hdfs.getSplits();
+                                List<FileSplit> fileSplits = new 
ArrayList<FileSplit>();
+                                for (int i : schedule) {
+                                    fileSplits.add((FileSplit) splits.get(i));
+                                }
+                                FileSplitsFactory splitsFactory = new 
FileSplitsFactory(fileSplits);
+                                List<FileSplit> inputSplits = 
splitsFactory.getSplits();
+                                ContextFactory ctxFactory = new 
ContextFactory();
+                                int size = inputSplits.size();
+                                InputStream stream;
+                                String value;
+                                RecordReader reader;
+                                TaskAttemptContext context;
+                                for (int i = 0; i < size; i++) {
+                                    //read split
+                                    context = 
ctxFactory.createContext(job.getConfiguration(), i);
+                                    try {
+                                        reader = 
inputFormat.createRecordReader(inputSplits.get(i), context);
+                                        reader.initialize(inputSplits.get(i), 
context);
+                                        while (reader.nextKeyValue()) {
+                                            value = 
reader.getCurrentValue().toString();
+                                            //Split value if it contains more 
than one item with the tag
+                                            if 
(StringUtils.countMatches(value, tag) > 1) {
+                                                String items[] = 
value.split(tag);
+                                                for (String item : items) {
+                                                    if (item.length() > 0) {
+                                                        item = START_TAG + tag 
+ item;
+                                                        stream = new 
ByteArrayInputStream(
+                                                                
item.getBytes(StandardCharsets.UTF_8));
+                                                        
parser.parseHDFSElements(stream, writer, fta, i);
+                                                    }
+                                                }
+                                            } else {
+                                                value = START_TAG + value;
+                                                //create an input stream to 
the file currently reading and send it to parser
+                                                stream = new 
ByteArrayInputStream(
+                                                        
value.getBytes(StandardCharsets.UTF_8));
+                                                
parser.parseHDFSElements(stream, writer, fta, i);
+                                            }
+                                        }
+
+                                    } catch (InterruptedException e) {
+                                        if (LOGGER.isLoggable(Level.SEVERE)) {
+                                            LOGGER.severe(e.getMessage());
+                                        }
+                                    }
+                                }
+
+                            } catch (IOException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            } catch (ParserConfigurationException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            } catch (SAXException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            }
+                        } else {
+                            try {
+                                //check if the path exists and is a directory
+                                if (fs.exists(directory) && 
fs.isDirectory(directory)) {
+                                    for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
+                                        //read every file in the directory
+                                        RemoteIterator<LocatedFileStatus> it = 
fs.listFiles(directory, true);
+                                        while (it.hasNext()) {
+                                            xmlDocument = it.next().getPath();
+                                            if (fs.isFile(xmlDocument)) {
+                                                if 
(LOGGER.isLoggable(Level.FINE)) {
+                                                    LOGGER.fine(
+                                                            "Starting to read 
XML document: " + xmlDocument.getName());
+                                                }
+                                                //create an input stream to 
the file currently reading and send it to parser
+                                                InputStream in = 
fs.open(xmlDocument).getWrappedStream();
+                                                parser.parseHDFSElements(in, 
writer, fta, tupleIndex);
+                                            }
+                                        }
+                                    }
+                                } else {
+                                    throw new HyracksDataException("Invalid 
HDFS directory parameter (" + nodeId + ":"
+                                            + directory + ") passed to 
collection.");
+                                }
+                            } catch (FileNotFoundException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            } catch (IOException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            }
+                        }
+                        try {
+                            fs.close();
+                        } catch (IOException e) {
+                            if (LOGGER.isLoggable(Level.SEVERE)) {
+                                LOGGER.severe(e.getMessage());
+                            }
+                        }
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index 238f6d3..820c365 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.vxquery.context.StaticContext;
-
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -45,6 +43,7 @@ import 
org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -52,16 +51,22 @@ import 
org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import org.apache.vxquery.context.StaticContext;
 
 public class VXQueryMetadataProvider implements IMetadataProvider<String, 
String> {
     private final String[] nodeList;
     private final Map<String, File> sourceFileMap;
     private final StaticContext staticCtx;
+    private final String hdfsConf;
+    private final Map<String, NodeControllerInfo> nodeControllerInfos;
 
-    public VXQueryMetadataProvider(String[] nodeList, Map<String, File> 
sourceFileMap, StaticContext staticCtx) {
+    public VXQueryMetadataProvider(String[] nodeList, Map<String, File> 
sourceFileMap, StaticContext staticCtx,
+            String hdfsConf, Map<String, NodeControllerInfo> 
nodeControllerInfos) {
         this.nodeList = nodeList;
         this.sourceFileMap = sourceFileMap;
         this.staticCtx = staticCtx;
+        this.hdfsConf = hdfsConf;
+        this.nodeControllerInfos = nodeControllerInfos;
     }
 
     @Override
@@ -82,7 +87,7 @@ public class VXQueryMetadataProvider implements 
IMetadataProvider<String, String
             List<LogicalVariable> scanVariables, List<LogicalVariable> 
projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> 
maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, 
JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) 
dataSource;
         if (sourceFileMap != null) {
             final int len = ds.getPartitions().length;
@@ -95,7 +100,8 @@ public class VXQueryMetadataProvider implements 
IMetadataProvider<String, String
             ds.setPartitions(collectionPartitions);
         }
         RecordDescriptor rDesc = new RecordDescriptor(new 
ISerializerDeserializer[opSchema.getSize()]);
-        IOperatorDescriptor scanner = new 
VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc);
+        IOperatorDescriptor scanner = new 
VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf,
+                this.nodeControllerInfos);
 
         AlgebricksPartitionConstraint constraint = 
getClusterLocations(nodeList, ds.getPartitionCount());
         return new Pair<IOperatorDescriptor, 
AlgebricksPartitionConstraint>(scanner, constraint);
@@ -129,7 +135,7 @@ public class VXQueryMetadataProvider implements 
IMetadataProvider<String, String
     @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
@@ -155,7 +161,7 @@ public class VXQueryMetadataProvider implements 
IMetadataProvider<String, String
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, 
JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 

Reply via email to