Collection with tag rule and changes

Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/924a01b4
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/924a01b4
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/924a01b4

Branch: refs/heads/steven/hdfs
Commit: 924a01b44e828b0ce931de927be90421e1c9829a
Parents: b33b194
Author: efikalti <[email protected]>
Authored: Tue Jul 28 14:57:07 2015 +0300
Committer: efikalti <[email protected]>
Committed: Tue Jul 28 14:57:07 2015 +0300

----------------------------------------------------------------------
 .../compiler/rewriter/RewriteRuleset.java       |   2 +
 .../rewriter/rules/AbstractCollectionRule.java  |  85 +++++
 .../rewriter/rules/CollectionWithTagRule.java   |  65 ++++
 .../vxquery/functions/builtin-functions.xml     |   8 +
 .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 339 +++++++++++++++++--
 .../hdfs2/XmlCollectionByTagInputFormat.java    | 221 ++++++++++++
 .../metadata/VXQueryCollectionDataSource.java   |  12 +
 .../VXQueryCollectionOperatorDescriptor.java    | 129 +++++--
 8 files changed, 792 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
index b67402b..1affeed 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
@@ -19,6 +19,7 @@ package org.apache.vxquery.compiler.rewriter;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.vxquery.compiler.rewriter.rules.CollectionWithTagRule;
 import 
org.apache.vxquery.compiler.rewriter.rules.ConsolidateAssignAggregateRule;
 import org.apache.vxquery.compiler.rewriter.rules.ConvertAssignToUnnestRule;
 import 
org.apache.vxquery.compiler.rewriter.rules.ConvertFromAlgebricksExpressionsRule;
@@ -115,6 +116,7 @@ public class RewriteRuleset {
         normalization.add(new SetCollectionDataSourceRule());
         normalization.add(new IntroduceCollectionRule());
         normalization.add(new RemoveUnusedAssignAndAggregateRule());
+        normalization.add(new CollectionWithTagRule());
 
         // Adds child steps to the data source scan.
         // TODO Replace consolidate with a new child function that takes 
multiple paths.

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
index 717914a..488602f 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
@@ -125,6 +125,91 @@ public abstract class AbstractCollectionRule implements 
IAlgebraicRewriteRule {
         return collectionName;
     }
 
+    protected String[] getCollectionWithTagName(Mutable<ILogicalOperator> 
opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            return null;
+        }
+        UnnestOperator unnest = (UnnestOperator) op;
+
+        // Check if assign is for fn:Collection.
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
unnest.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return null;
+        }
+        AssignOperator assign = (AssignOperator) op2;
+
+        // Check to see if the expression is a function and fn:Collection.
+        ILogicalExpression logicalExpression = (ILogicalExpression) 
assign.getExpressions().get(0).getValue();
+        if (logicalExpression.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
+            return null;
+        }
+        AbstractFunctionCallExpression functionCall = 
(AbstractFunctionCallExpression) logicalExpression;
+        if (!functionCall.getFunctionIdentifier().equals(
+                
BuiltinFunctions.FN_COLLECTIONWITHTAG_2.getFunctionIdentifier())) {
+            return null;
+        }
+        // Get arguments
+        int size = functionCall.getArguments().size();
+        if(size > 0)
+        {
+            String args[] = new String[size];
+            for (int i=0; i<functionCall.getArguments().size(); i++)
+            {
+                args[i] = getArgument(functionCall, opRef, i);
+            }
+            return args;
+        }
+        return null;
+    }
+    
+    private String getArgument(AbstractFunctionCallExpression functionCall, 
Mutable<ILogicalOperator> opRef, int pos)
+    {
+        VXQueryConstantValue constantValue;
+        ILogicalExpression logicalExpression2 = (ILogicalExpression) 
functionCall.getArguments().get(pos).getValue();
+        if (logicalExpression2.getExpressionTag() != 
LogicalExpressionTag.VARIABLE) {
+            return null;
+        }
+        VariableReferenceExpression vre = (VariableReferenceExpression) 
logicalExpression2;
+        Mutable<ILogicalOperator> opRef3 = 
OperatorToolbox.findProducerOf(opRef, vre.getVariableReference());
+
+        // Get the string assigned to the collection function.
+        AbstractLogicalOperator op3 = (AbstractLogicalOperator) 
opRef3.getValue();
+        if (op3.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            AssignOperator assign2 = (AssignOperator) op3;
+
+            // Check to see if the expression is a constant expression and 
type string.
+            ILogicalExpression logicalExpression3 = (ILogicalExpression) 
assign2.getExpressions().get(0).getValue();
+            if (logicalExpression3.getExpressionTag() != 
LogicalExpressionTag.CONSTANT) {
+                return null;
+            }
+            ConstantExpression constantExpression = (ConstantExpression) 
logicalExpression3;
+            constantValue = (VXQueryConstantValue) 
constantExpression.getValue();
+            if (constantValue.getType() != 
SequenceType.create(BuiltinTypeRegistry.XS_STRING, Quantifier.QUANT_ONE)) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        String args[] = new String[2];
+        // Constant value is now in a TaggedValuePointable. Convert the value 
into a java String.
+        tvp.set(constantValue.getValue(), 0, constantValue.getValue().length);
+        String arg = null;
+        if (tvp.getTag() == ValueTag.XS_STRING_TAG) {
+            tvp.getValue(stringp);
+            try {
+                bbis.setByteBuffer(
+                        
ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(), 
stringp.getStartOffset(),
+                                stringp.getLength() + 
stringp.getStartOffset())), 0);
+                arg = di.readUTF();
+                return arg;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        return null;
+    }
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
         return false;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java
new file mode 100644
index 0000000..2e730ec
--- /dev/null
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.compiler.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
+import org.apache.vxquery.metadata.VXQueryCollectionDataSource;
+import org.apache.vxquery.types.AnyItemType;
+import org.apache.vxquery.types.Quantifier;
+import org.apache.vxquery.types.SequenceType;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class CollectionWithTagRule extends AbstractCollectionRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
+        VXQueryOptimizationContext vxqueryContext = 
(VXQueryOptimizationContext) context;
+        String args[] = getCollectionWithTagName(opRef);
+
+        if (args != null) {
+            // Build the new operator and update the query plan.
+            int collectionId = vxqueryContext.newCollectionId();
+            VXQueryCollectionDataSource ds = 
VXQueryCollectionDataSource.create(collectionId, args[0],
+                    SequenceType.create(AnyItemType.INSTANCE, 
Quantifier.QUANT_STAR));
+            if (ds != null) {
+                ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
+                ds.setTag(args[1]);
+                // Known to be true because of collection name.
+                AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+                UnnestOperator unnest = (UnnestOperator) op;
+                Mutable<ILogicalOperator> opRef2 = unnest.getInputs().get(0);
+                AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
opRef2.getValue();
+                AssignOperator assign = (AssignOperator) op2;
+
+                DataSourceScanOperator opNew = new 
DataSourceScanOperator(assign.getVariables(), ds);
+                opNew.getInputs().addAll(assign.getInputs());
+                opRef2.setValue(opNew);
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index 38f03a4..4e5b6de 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -128,6 +128,14 @@
         <!-- Collection operator is added during the rewrite rules phase.  -->
     </function>
     
+    <!-- fn:collectionwithtag($arg1  as xs:string?, $arg2 as xs:string?) as  
node()* -->
+    <function name="fn:collectionwithtag">
+        <param name="arg1" type="xs:string?"/>
+        <param name="arg2" type="xs:string?"/>
+        <return type="node()*"/>
+        <!-- CollectionWithTag operator is added during the rewrite rules 
phase.  -->
+    </function>
+    
     <!-- fn:compare($comparand1  as xs:string?, $comparand2 as xs:string?)  as 
xs:integer?  -->
     <function name="fn:compare">
         <param name="comparand1" type="xs:string?"/>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
index 50037ea..e13adf5 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
@@ -16,42 +16,92 @@
  */
 package org.apache.vxquery.hdfs2;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Properties;
 
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
+
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 public class HDFSFunctions {
 
     private Configuration conf;
     private FileSystem fs;
     private String conf_path;
+    private Job job;
+    private InputFormat inputFormat;
+    private List<InputSplit> splits;
+    private ArrayList<ArrayList<String>> nodes;
+    private File nodeXMLfile;
+    private HashMap<Integer, String> schedule;
 
     /**
      * Create the configuration and add the paths for core-site and hdfs-site 
as resources.
      * Initialize an instance of HDFS FileSystem for this configuration.
-     * 
-     * @param hadoop_conf_filepath
      */
     public HDFSFunctions() {
-        if (locateConf()) {
-            this.conf = new Configuration();
+        this.conf = new Configuration();
+    }
 
-            conf.addResource(new Path(this.conf_path + "/core-site.xml"));
-            conf.addResource(new Path(this.conf_path + "/hdfs-site.xml"));
-            try {
-                fs = FileSystem.get(conf);
-            } catch (IOException ex) {
-                System.err.println(ex);
-            }
-        } else {
-            System.err.println("Could not locate hdfs configuarion folder.");
+    /**
+     * Create the needed objects for reading the splits of the filepath given 
as argument.
+     * This method should run before the scheduleSplits method.
+     * 
+     * @param filepath
+     */
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    public void setJob(String filepath, String tag) {
+        try {
+            conf.set("start_tag", "<" + tag + ">");
+            conf.set("end_tag", "</" + tag + ">");
+            job = new Job(conf, "Read from HDFS");
+            Path input = new Path(filepath);
+            FileInputFormat.addInputPath(job, input);
+            //TODO change input format class to 
XMLInputFormatClassOneBufferSolution
+            job.setInputFormatClass(XmlCollectionByTagInputFormat.class);
+            inputFormat = 
ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+            splits = inputFormat.getSplits(job);
+        } catch (IOException e) {
+            System.err.println(e);
+        } catch (ClassNotFoundException e) {
+            System.err.println(e);
+        } catch (InterruptedException e) {
+            System.err.println(e);
         }
     }
 
@@ -71,15 +121,11 @@ public class HDFSFunctions {
         } catch (IOException ex) {
             System.err.println(ex);
         }
-        //Search every file and folder in the home directory
-        if (searchInDirectory(fs.getHomeDirectory(), filename) != null) {
-            return true;
-        }
-        return false;
+        return searchInDirectory(fs.getHomeDirectory(), filename) != null;
     }
 
     /**
-     * Searches the given directory and subdirectories for the file.
+     * Searches the given directory for the file.
      * 
      * @param directory
      *            to search
@@ -88,7 +134,7 @@ public class HDFSFunctions {
      * @return path if file exists in this directory.else return null.
      */
     public Path searchInDirectory(Path directory, String filename) {
-        //Search every folder in the directory
+        //Search the files and folder in this Path to find the one matching 
the filename.
         try {
             RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, 
true);
             String[] parts;
@@ -114,9 +160,9 @@ public class HDFSFunctions {
      */
     private boolean locateConf() {
 
-        this.conf_path = System.getProperty("HDFS_CONF");
         if (this.conf_path == null) {
 
+            nodeXMLfile = new 
File("/home/efi/Projects/vxquery/vxquery-server/src/main/resources/conf/local.xml");
             // load properties file
             Properties prop = new Properties();
             String propFilePath = 
"../vxquery-server/src/main/resources/conf/cluster.properties";
@@ -127,22 +173,17 @@ public class HDFSFunctions {
                 try {
                     prop.load(new FileInputStream(propFilePath));
                 } catch (FileNotFoundException e1) {
-                    e1.printStackTrace();
                 } catch (IOException e1) {
-                    e1.printStackTrace();
                 }
             } catch (IOException e) {
-                e.printStackTrace();
+                System.err.println(e);
             }
 
             // get the property value for HDFS_CONF
             this.conf_path = prop.getProperty("HDFS_CONF");
-            if (this.conf_path == null) {
-                return false;
-            }
-            return true;
+            return this.conf_path != null;
         }
-        return true;
+        return false;
     }
 
     /**
@@ -161,12 +202,12 @@ public class HDFSFunctions {
                     fs.delete(dest, true); //recursive delete
                 }
             } catch (IOException e) {
-                e.printStackTrace();
+                System.err.println(e);
             }
             try {
                 fs.copyFromLocalFile(path, dest);
             } catch (IOException e) {
-                e.printStackTrace();
+                System.err.println(e);
             }
         }
         return false;
@@ -179,14 +220,242 @@ public class HDFSFunctions {
      * @return
      */
     public FileSystem getFileSystem() {
-        if (this.conf_path != null) {
-            return this.fs;
+        if (locateConf()) {
+            conf.addResource(new Path(this.conf_path + "/core-site.xml"));
+            conf.addResource(new Path(this.conf_path + "/hdfs-site.xml"));
+            try {
+                fs = FileSystem.get(conf);
+                return this.fs;
+            } catch (IOException ex) {
+                System.err.println(ex);
+            }
         } else {
-            return null;
+            System.err.println("Could not locate hdfs configuarion folder.");
+        }
+        return null;
+    }
+
+    /**
+     * Create a HashMap that has as key the hostname and values the splits 
that belong to this hostname;
+     * 
+     * @return
+     * @throws IOException
+     */
+    public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws 
IOException {
+        HashMap<String, ArrayList<Integer>> splits_map = new HashMap<String, 
ArrayList<Integer>>();
+        ArrayList<Integer> temp;
+        int i = 0;
+        String hostname;
+        for (InputSplit s : this.splits) {
+            SplitLocationInfo info[] = s.getLocationInfo();
+            hostname = info[0].getLocation();
+            if (splits_map.containsKey(hostname)) {
+                temp = splits_map.get(hostname);
+                temp.add(i);
+            } else {
+                temp = new ArrayList<Integer>();
+                temp.add(i);
+                splits_map.put(hostname, temp);
+            }
+            i++;
+        }
+
+        return splits_map;
+    }
+
+    public void scheduleSplits() throws IOException {
+
+        schedule = new HashMap<Integer, String>();
+        ArrayList<String> empty = new ArrayList<String>();
+        HashMap<String, ArrayList<Integer>> splits_map = 
this.getLocationsOfSplits();
+        readNodesFromXML();
+        int count = this.splits.size();
+
+        ArrayList<Integer> splits;
+        String node;
+        for (ArrayList<String> info : this.nodes) {
+            node = info.get(0);
+            if (splits_map.containsKey(node)) {
+                splits = splits_map.get(node);
+                for (Integer split : splits) {
+                    schedule.put(split, node);
+                    count--;
+                }
+                splits_map.remove(node);
+            } else {
+                empty.add(node);
+            }
+        }
+
+        //Check if every split got assigned to a node
+        if (count != 0) {
+            ArrayList<Integer> remaining = new ArrayList<Integer>();
+            // Find remaining splits
+            for (InputSplit s : this.splits) {
+                int i = 0;
+                if (!schedule.containsKey(i)) {
+                    remaining.add(i);
+                }
+            }
+
+            if (empty.size() != 0) {
+                int node_number = 0;
+                for (int split : remaining) {
+                    if (node_number == empty.size()) {
+                        node_number = 0;
+                    }
+                    schedule.put(split, empty.get(node_number));
+                    node_number++;
+                }
+            }
+        }
+        // TODO remove from here this is for debugging only
+        for (int s : schedule.keySet()) {
+            System.out.println("split: " + s + ", host: " + schedule.get(s));
+        }
+    }
+
+    /**
+     * Read the hostname and the ip address of every node from the xml cluster 
configuration file.
+     * Save the information inside an ArrayList.
+     */
+    public void readNodesFromXML() {
+        DocumentBuilderFactory dbFactory = 
DocumentBuilderFactory.newInstance();
+        DocumentBuilder dBuilder;
+        try {
+            dBuilder = dbFactory.newDocumentBuilder();
+            Document doc = dBuilder.parse(nodeXMLfile);
+            doc.getDocumentElement().normalize();
+
+            nodes = new ArrayList<ArrayList<String>>();
+            NodeList nList = doc.getElementsByTagName("node");
+
+            for (int temp = 0; temp < nList.getLength(); temp++) {
+
+                Node nNode = nList.item(temp);
+
+                if (nNode.getNodeType() == Node.ELEMENT_NODE) {
+
+                    Element eElement = (Element) nNode;
+                    ArrayList<String> info = new ArrayList<String>();
+                    
info.add(eElement.getElementsByTagName("id").item(0).getTextContent());
+                    
info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent());
+                    nodes.add(info);
+                }
+            }
+        } catch (ParserConfigurationException e) {
+            System.err.println(e);
+        } catch (SAXException e) {
+            System.err.println(e);
+        } catch (IOException e) {
+            System.err.println(e);
+        }
+    }
+
+    /**
+     * Writes the schedule to a temporary file, then uploads the file to the 
HDFS.
+     */
+    public void addScheduleToDistributedCache() {
+        String filepath = "/tmp/splits_schedule.txt";
+        String dfs_path = "vxquery_splits_schedule.txt";
+        PrintWriter writer;
+        try {
+            writer = new PrintWriter(filepath, "UTF-8");
+            for (int split : this.schedule.keySet()) {
+                writer.write(split + "," + this.schedule.get(split));
+            }
+            writer.close();
+        } catch (FileNotFoundException e) {
+            System.err.println(e);
+        } catch (UnsupportedEncodingException e) {
+            System.err.println(e);
+        }
+        // Add file to HDFS
+        this.put(filepath, dfs_path);
+    }
+
+    public RecordReader getReader() {
+
+        List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+        for (int i = 0; i < splits.size(); i++) {
+            fileSplits.add((FileSplit) splits.get(i));
         }
+        FileSplitsFactory splitsFactory;
+        try {
+            splitsFactory = new FileSplitsFactory(fileSplits);
+            List<FileSplit> inputSplits = splitsFactory.getSplits();
+            ContextFactory ctxFactory = new ContextFactory();
+            int size = inputSplits.size();
+            for (int i = 0; i < size; i++) {
+                /**
+                 * read the split
+                 */
+                TaskAttemptContext context;
+                try {
+                    context = ctxFactory.createContext(job.getConfiguration(), 
i);
+                    RecordReader reader = 
inputFormat.createRecordReader(inputSplits.get(i), context);
+                    reader.initialize(inputSplits.get(i), context);
+                    return reader;
+                } catch (HyracksDataException e) {
+                    System.err.println(e);
+                } catch (IOException e) {
+                    System.err.println(e);
+                } catch (InterruptedException e) {
+                    System.err.println(e);
+                }
+            }
+        } catch (HyracksDataException e1) {
+            // TODO Auto-generated catch block
+            e1.printStackTrace();
+        }
+        return null;
     }
 
-    public void scheduleSplits() {
+    /**
+     * @return schedule.
+     */
+    public HashMap<Integer, String> getSchedule() {
+        return this.schedule;
+    }
 
+    /**
+     * Return the splits belonging to this node for the existing schedule.
+     * 
+     * @param node
+     * @return
+     */
+    public ArrayList<Integer> getScheduleForNode(String node) {
+        ArrayList<Integer> node_schedule = new ArrayList<Integer>();
+        for (int split : this.schedule.keySet()) {
+            if (node.equals(this.schedule.get(split))) {
+                node_schedule.add(split);
+            }
+        }
+        return node_schedule;
+    }
+
+    public List<InputSplit> getSplits() {
+        return this.splits;
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public InputFormat getinputFormat() {
+        return this.inputFormat;
+    }
+
+    public Document convertStringToDocument(String xmlStr) {
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder builder;
+        try {
+            builder = factory.newDocumentBuilder();
+            Document doc = builder.parse(new InputSource(new 
StringReader(xmlStr)));
+            return doc;
+        } catch (Exception e) {
+            System.err.println(e);
+        }
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
new file mode 100644
index 0000000..fa2e4f9
--- /dev/null
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlCollectionByTagInputFormat extends TextInputFormat {
+
+    public static String STARTING_TAG;
+    public static String ENDING_TAG;
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(InputSplit 
split, TaskAttemptContext context) {
+        try {
+            STARTING_TAG = context.getConfiguration().get("start_tag");
+            ENDING_TAG = context.getConfiguration().get("end_tag");
+            return new XmlRecordReader((FileSplit) split, 
context.getConfiguration());
+        } catch (IOException ioe) {
+            return null;
+        }
+    }
+
+    /**
+     * XMLRecordReader class to read through a given xml document to output 
xml blocks as records as specified
+     * by the end tag
+     */
+    public static class XmlRecordReader extends RecordReader<LongWritable, 
Text> {
+
+        private final byte[] end_tag;
+        private final byte[] start_tag;
+        private final long start;
+        private final long end;
+        private int current_block = 0;
+        private final FSDataInputStream fsin;
+        private final DataOutputBuffer buffer = new DataOutputBuffer();
+        private LongWritable currentKey;
+        private Text currentValue;
+        BlockLocation[] blocks;
+        public static byte[] nl = "\n".getBytes();
+
+        public XmlRecordReader(FileSplit split, Configuration conf) throws 
IOException {
+            end_tag = ENDING_TAG.getBytes(Charsets.UTF_8);
+            start_tag = STARTING_TAG.getBytes(Charsets.UTF_8);
+
+            // open the file and seek to the start of the split
+            start = split.getStart();
+            // set the end of the file
+            end = start + split.getLength();
+            Path file = split.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            FileStatus fStatus = fs.getFileStatus(file);
+            blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
+            // seek the start of file
+            fsin = fs.open(split.getPath());
+            fsin.seek(start);
+        }
+
+        /**
+         * Get next block item
+         * 
+         * @param key
+         * @param value
+         * @return
+         * @throws IOException
+         */
+        private boolean next(LongWritable key, Text value) throws IOException {
+            // current_block = nextBlock();
+            //if (fsin.getPos() < end && current_block < blocks.length)
+            if (fsin.getPos() < end) {
+                try {
+                    if (readBlock(true)) {
+                        key.set(fsin.getPos());
+                        value.set(buffer.getData(), 0, buffer.getLength());
+                        return true;
+                    }
+                } finally {
+                    buffer.reset();
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+            Closeables.close(fsin, true);
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+            return (fsin.getPos() - start) / (float) (end - start);
+        }
+
+        /**
+         * Read the block from start till end and after that until you find a 
closing tag
+         * 
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readBlock(boolean withinBlock) throws IOException {
+            boolean read = false;
+
+            while (true) {
+                if (fsin.getPos() < end) {
+                    if (readUntilMatch(start_tag, false)) {
+                        buffer.write(start_tag);
+                        readUntilMatch(end_tag, true);
+                        //buffer.write(this.nl);
+                        read = true;
+                    }
+                } else {
+                    return read;
+                }
+            }
+        }
+
+        /**
+         * Read from block(s) until you reach the end of file or find a 
matching bytes with match[]
+         * 
+         * @param match
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readUntilMatch(byte[] match, boolean withinBlock) 
throws IOException {
+            int i = 0;
+            while (true) {
+                int b = fsin.read();
+                // end of file:
+                if (b == -1) {
+                    return false;
+                }
+                // save to buffer:
+                if (withinBlock) {
+                    buffer.write(b);
+                }
+
+                // check if we're matching:
+                if (b == match[i]) {
+                    i++;
+                    if (i >= match.length) {
+                        return true;
+                    }
+                } else {
+                    i = 0;
+                }
+                // see if we've passed the stop point:
+                if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+                    return false;
+                }
+            }
+        }
+
+        private int nextBlock() throws IOException {
+            long pos = fsin.getPos();
+            long block_length;
+            for (int i = 0; i < blocks.length; i++) {
+                block_length = blocks[i].getOffset() + blocks[i].getLength();
+                if (pos == block_length) {
+                    return i + 1;
+                }
+            }
+            return 0;
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
+            return currentKey;
+        }
+
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException 
{
+            return currentValue;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            currentKey = new LongWritable();
+            currentValue = new Text();
+            return next(currentKey, currentValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index d17a1a9..decff1c 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -37,6 +37,7 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
     private String[] collectionPartitions;
     private final List<Integer> childSeq;
     private int totalDataSources;
+    private String tag;
 
     private final Object[] types;
 
@@ -60,6 +61,7 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
             }
         };
         this.childSeq = new ArrayList<Integer>();
+        this.tag = null;
     }
 
     public int getTotalDataSources() {
@@ -85,6 +87,16 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
     public int getPartitionCount() {
         return collectionPartitions.length;
     }
+    
+    public String getTag()
+    {
+        return this.tag;
+    }
+    
+    public void setTag(String tag)
+    {
+        this.tag = tag;
+    }
 
     @Override
     public String getId() {

http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index c945395..b7627d0 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -16,13 +16,15 @@
  */
 package org.apache.vxquery.metadata;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.logging.Level;
@@ -34,6 +36,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.vxquery.context.DynamicContext;
 import org.apache.vxquery.hdfs2.HDFSFunctions;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
@@ -51,6 +59,8 @@ import 
edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import 
edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import 
edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
 
 public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -59,6 +69,8 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
     private String[] collectionPartitions;
     private List<Integer> childSeq;
     protected static final Logger LOGGER = 
Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
+    private HDFSFunctions hdfs;
+    private String tag;
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry 
spec, VXQueryCollectionDataSource ds,
             RecordDescriptor rDesc) {
@@ -68,6 +80,7 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
         totalDataSources = (short) ds.getTotalDataSources();
         childSeq = ds.getChildSeq();
         recordDescriptors[0] = rDesc;
+        this.tag = ds.getTag();
     }
 
     @Override
@@ -92,6 +105,7 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
             public void open() throws HyracksDataException {
                 appender.reset(frame, true);
                 writer.open();
+                hdfs = new HDFSFunctions();
             }
 
             @Override
@@ -119,52 +133,99 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                         throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
                                 + collectionDirectory.getAbsolutePath() + ") 
passed to collection.");
                     }
-                }
-                //else check in HDFS file system
-                else {
-                    //get instance of the HDFS filesystem
-                    HDFSFunctions hdfs = new HDFSFunctions();
+                } else {
+                    // Else check in HDFS file system
+                    // Get instance of the HDFS filesystem
                     FileSystem fs = hdfs.getFileSystem();
                     if (fs != null) {
                         Path directory = new Path(collectionModifiedName);
                         Path xmlDocument;
-                        try {
-                            //check if the path exists and is a directory
-                            if (fs.exists(directory) && 
fs.isDirectory(directory)) {
-                                for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
-                                    //read every files in the directory
-                                    RemoteIterator<LocatedFileStatus> it = 
fs.listFiles(directory, true);
-                                    while (it.hasNext()) {
-                                        xmlDocument = it.next().getPath();
-                                        if (fs.isFile(xmlDocument)) {
-                                            if (LOGGER.isLoggable(Level.FINE)) 
{
-                                                LOGGER.fine("Starting to read 
XML document: " + xmlDocument.getName());
-                                            }
+                        if (tag != null) {
+                            String tags[] = tag.split("/");
+                            String start_tag = "<?xml version=\"1.0\" 
encoding=\"utf-8\"?>";
+                            String end_tag = "";
+                            for (int i = 0; i < tags.length - 1; i++) {
+                                start_tag = start_tag + "<" + tags[i] + ">";
+                                end_tag = end_tag + "</" + tags[i] + ">";
+                            }
+                            hdfs.setJob(directory.getName(), tags[tags.length 
- 1]);
+                            Job job = hdfs.getJob();
+                            InputFormat inputFormat = hdfs.getinputFormat();
+                            try {
+                                hdfs.scheduleSplits();
+                                ArrayList<Integer> schedule = 
hdfs.getScheduleForNode(InetAddress.getLocalHost()
+                                        .getHostName());
+                                List<InputSplit> splits = hdfs.getSplits();
+                                List<FileSplit> fileSplits = new 
ArrayList<FileSplit>();
+                                for (int i : schedule) {
+                                    fileSplits.add((FileSplit) splits.get(i));
+                                }
+                                FileSplitsFactory splitsFactory = new 
FileSplitsFactory(fileSplits);
+                                List<FileSplit> inputSplits = 
splitsFactory.getSplits();
+                                ContextFactory ctxFactory = new 
ContextFactory();
+                                int size = inputSplits.size();
+                                for (int i = 0; i < size; i++) {
+                                    //read split
+                                    TaskAttemptContext context = 
ctxFactory.createContext(job.getConfiguration(), i);
+                                    RecordReader reader;
+                                    try {
+                                        reader = 
inputFormat.createRecordReader(inputSplits.get(i), context);
+                                        reader.initialize(inputSplits.get(i), 
context);
+                                        while (reader.nextKeyValue() == true) {
+                                            String value = 
reader.getCurrentValue().toString();
+                                            String xml = start_tag;
+                                            xml = xml + value + end_tag;
+                                            System.out.println(xml);
                                             //create an input stream to the 
file currently reading and send it to parser
-                                            InputStream in = 
fs.open(xmlDocument).getWrappedStream();
-                                            parser.parseHDFSElements(in, 
writer, fta, tupleIndex);
+                                            InputStream stream = new 
ByteArrayInputStream(
+                                                    
xml.getBytes(StandardCharsets.UTF_8));
+                                            parser.parseHDFSElements(stream, 
writer, fta, i);
                                         }
+
+                                    } catch (InterruptedException e) {
+                                        System.err.println(e);
                                     }
                                 }
-                            } else {
-                                throw new HyracksDataException("Invalid HDFS 
directory parameter (" + nodeId + ":"
-                                        + collectionDirectory + ") passed to 
collection.");
-                            }
-                        } catch (FileNotFoundException e) {
-                            System.err.println(e);
-                        } catch (IOException e) {
-                            System.err.println(e);
-                        }
 
-                        //Check for collection with tags
-                        if (true) {
+                            } catch (IOException e) {
+                                System.err.println(e);
+                            }
+                        } else {
                             try {
-                                
System.out.println(InetAddress.getLocalHost().getHostName());
-                            } catch (UnknownHostException e) {
-                                e.printStackTrace();
+                                //check if the path exists and is a directory
+                                if (fs.exists(directory) && 
fs.isDirectory(directory)) {
+                                    for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
+                                        //read every file in the directory
+                                        RemoteIterator<LocatedFileStatus> it = 
fs.listFiles(directory, true);
+                                        while (it.hasNext()) {
+                                            xmlDocument = it.next().getPath();
+                                            if (fs.isFile(xmlDocument)) {
+                                                if 
(LOGGER.isLoggable(Level.FINE)) {
+                                                    LOGGER.fine("Starting to 
read XML document: "
+                                                            + 
xmlDocument.getName());
+                                                }
+                                                //create an input stream to 
the file currently reading and send it to parser
+                                                InputStream in = 
fs.open(xmlDocument).getWrappedStream();
+                                                parser.parseHDFSElements(in, 
writer, fta, tupleIndex);
+                                            }
+                                        }
+                                    }
+                                } else {
+                                    throw new HyracksDataException("Invalid 
HDFS directory parameter (" + nodeId + ":"
+                                            + collectionDirectory + ") passed 
to collection.");
+                                }
+                            } catch (FileNotFoundException e) {
+                                System.err.println(e);
+                            } catch (IOException e) {
+                                System.err.println(e);
                             }
                         }
                     }
+                    try {
+                        fs.close();
+                    } catch (IOException e) {
+                        System.err.println(e);
+                    }
                 }
             }
 

Reply via email to