VXQUERY-131: Support for reading HDFS XML files. 1. User can choose between local and HDFS file systems for data input 2. New collection-with-tag function that reads from HDFS by blocks 3. Collection and Document can both read HDFS 4. Custom InputFormatClass for XML in HDFS 5. Parsing of data from HDFS as whole files 6. Unit tests for HDFS 7. MiniDFS cluster for unit tests 8. Documentation 9. Allow user to pass HDFS config folder as a param to queries Author: Efi Kaltirimidou github: efikalti
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/3fc2d6c2 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/3fc2d6c2 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/3fc2d6c2 Branch: refs/heads/site Commit: 3fc2d6c2f74083513f5a45ccdef2bde8ad23d9a8 Parents: 7de1fba Author: Steven Jacobs <[email protected]> Authored: Wed May 18 14:56:53 2016 -0700 Committer: Steven Jacobs <[email protected]> Committed: Wed May 18 14:56:53 2016 -0700 ---------------------------------------------------------------------- pom.xml | 83 +++- src/site/apt/user_cluster_installation.apt | 2 +- src/site/apt/user_query.apt | 1 + src/site/apt/user_query_hdfs.apt | 180 ++++++++ src/site/site.xml | 3 + vxquery-cli/pom.xml | 12 + .../java/org/apache/vxquery/cli/VXQuery.java | 46 +- vxquery-core/pom.xml | 61 ++- .../compiler/rewriter/RewriteRuleset.java | 2 +- .../rewriter/rules/AbstractCollectionRule.java | 50 ++- .../rewriter/rules/IntroduceCollectionRule.java | 11 +- .../vxquery/functions/builtin-functions.xml | 8 + .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 428 +++++++++++++++++++ .../hdfs2/XmlCollectionWithTagInputFormat.java | 217 ++++++++++ .../metadata/VXQueryCollectionDataSource.java | 12 +- .../VXQueryCollectionOperatorDescriptor.java | 190 +++++++- .../metadata/VXQueryMetadataProvider.java | 20 +- .../runtime/functions/util/FunctionHelper.java | 49 ++- .../org/apache/vxquery/xmlparser/XMLParser.java | 49 ++- .../xmlquery/query/XMLQueryCompiler.java | 92 ++-- .../xmlquery/query/SimpleXQueryTest.java | 19 +- .../src/main/resources/conf/cluster.properties | 6 +- vxquery-xtest/pom.xml | 15 + .../java/org/apache/vxquery/xtest/MiniDFS.java | 75 ++++ .../org/apache/vxquery/xtest/TestRunner.java | 23 +- .../org/apache/vxquery/xtest/XTestOptions.java | 3 + .../vxquery/xtest/AbstractXQueryTest.java | 1 + .../org/apache/vxquery/xtest/VXQueryTest.java | 19 + .../HDFS/Aggregate/avgHDFS.txt | 1 + .../HDFS/Aggregate/countHDFS.txt | 1 + .../HDFS/Aggregate/maxHDFS.txt | 1 + .../HDFS/Aggregate/maxvalueHDFS.txt | 1 + .../HDFS/Aggregate/minHDFS.txt | 1 + .../HDFS/Aggregate/sumHDFS.txt | 1 + .../Queries/XQuery/HDFS/Aggregate/avgHDFS.xq | 25 ++ .../Queries/XQuery/HDFS/Aggregate/countHDFS.xq | 25 ++ .../Queries/XQuery/HDFS/Aggregate/maxHDFS.xq | 25 ++ .../XQuery/HDFS/Aggregate/maxvalueHDFS.xq | 23 + .../Queries/XQuery/HDFS/Aggregate/minHDFS.xq | 25 ++ .../Queries/XQuery/HDFS/Aggregate/sumHDFS.xq | 25 ++ .../src/test/resources/VXQueryCatalog.xml | 15 + .../test/resources/cat/HDFSAggregateQueries.xml | 53 +++ .../test/resources/hadoop/conf/core-site.xml | 34 ++ .../test/resources/hadoop/conf/hdfs-site.xml | 34 ++ .../test/resources/hadoop/conf/mapred-site.xml | 41 ++ 45 files changed, 1844 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8a97a92..911206d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -169,6 +170,24 @@ </roles> <timezone>-8</timezone> </contributor> + <contributor> + <name>Menaka Madushanka</name> + <email /> + <organization /> + <roles> + <role>Developer</role> + </roles> + <timezone>+5:30</timezone> + </contributor> + <contributor> + <name>Riyafa Abdul Hameed</name> + <email /> + <organization /> + <roles> + <role>Developer</role> + </roles> + <timezone>+5:30</timezone> + </contributor> </contributors> <mailingLists> @@ -295,6 +314,25 @@ </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-hdfs-2.x</artifactId> + <version>${hyracks.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-hdfs-core</artifactId> + <version>${hyracks.version}</version> + <type>jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>2.7.0</version> + </dependency> + + <dependency> <groupId>ant</groupId> <artifactId>ant-trax</artifactId> <version>1.6.5</version> @@ -340,7 +378,7 @@ <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> - <version>1.3.2</version> + <version>2.4</version> </dependency> <dependency> @@ -359,9 +397,10 @@ <dependency> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty</artifactId> - <version>6.1.4</version> + <version>6.1.22</version> <scope>compile</scope> </dependency> + </dependencies> </dependencyManagement> @@ -437,19 +476,21 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> - <!-- We override the configuration plugin to override the descriptor to use for building - the source release zip. Specifically, we would like to control the inclusions/exclusions. - For example, we exclude the KEYS file from the zip --> + <!-- We override the configuration plugin to override the descriptor + to use for building the source release zip. Specifically, we would like to + control the inclusions/exclusions. For example, we exclude the KEYS file + from the zip --> <executions> <execution> - <!-- Use this id to match the id mentioned in the assembly plugin configuration in - the apache parent POM under the apache-release profile --> + <!-- Use this id to match the id mentioned in the assembly plugin configuration + in the apache parent POM under the apache-release profile --> <id>source-release-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> - <!-- combine.self should be override to replace the configuration in the parent POM --> + <!-- combine.self should be override to replace the configuration in + the parent POM --> <configuration combine.self="override"> <runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot> <descriptors> @@ -491,6 +532,29 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-site-plugin</artifactId> </plugin> + <plugin> + <groupId>com.google.code.maven-replacer-plugin</groupId> + <artifactId>replacer</artifactId> + <version>1.5.3</version> + <executions> + <execution> + <phase>prepare-package</phase> + <goals> + <goal>replace</goal> + </goals> + </execution> + </executions> + <configuration> + <ignoreMissingFile>true</ignoreMissingFile> + <file>vxquery-server/src/main/resources/conf/cluster.properties</file> + <outputFile> + vxquery-server/src/main/resources/conf/cluster.properties + </outputFile> + <regex>false</regex> + <token>$CONF_PATH$</token> + <value>${basedir}/vxquery-xtest/src/test/resources/hadoop/conf</value> + </configuration> + </plugin> </plugins> </build> @@ -541,6 +605,7 @@ <maxmemory>2g</maxmemory> </configuration> </plugin> + </plugins> </reporting> http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/apt/user_cluster_installation.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/user_cluster_installation.apt b/src/site/apt/user_cluster_installation.apt index 0e756b0..19e20ed 100644 --- a/src/site/apt/user_cluster_installation.apt +++ b/src/site/apt/user_cluster_installation.apt @@ -32,7 +32,7 @@ Cluster Installation * Apache VXQuery\x99 source archive (apache-vxquery-X.Y-source-release.zip) - * JDK >= 1.7 + * JDK >= 1.8 * Apache Maven >= 3.2 http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/apt/user_query.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/user_query.apt b/src/site/apt/user_query.apt index 8ea6429..c5132c3 100644 --- a/src/site/apt/user_query.apt +++ b/src/site/apt/user_query.apt @@ -49,6 +49,7 @@ vxq "path-to"\test.xq -showrp : Show Runtime plan -showtet : Show translated expression tree -timing : Produce timing information +-hdfs-conf VAL : The folder containing the HDFS configuration files ---------------------------------------- * Java Options http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/apt/user_query_hdfs.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/user_query_hdfs.apt b/src/site/apt/user_query_hdfs.apt new file mode 100644 index 0000000..fa736b8 --- /dev/null +++ b/src/site/apt/user_query_hdfs.apt @@ -0,0 +1,180 @@ +~~ Licensed to the Apache Software Foundation (ASF) under one or more +~~ contributor license agreements. See the NOTICE file distributed with +~~ this work for additional information regarding copyright ownership. +~~ The ASF licenses this file to You under the Apache License, Version 2.0 +~~ (the "License"); you may not use this file except in compliance with +~~ the License. You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. + +Executing a Query in HDFS + + +* 1. Connecting VXQuery with HDFS + + In order to read HDFS data, VXQuery needs access to the HDFS configuration + directory, which contains: + + core-site.xml + hdfs-site.xml + mapred-site.xml + + Some systems may automatically set this directory as a system environment + variable ("HADOOP_CONF_DIR"). If this is the case, VXQuery will retrieve + this automatically when attempting to perform HDFS queries. + + When this variable is not set, users will need to provide this directory as + a Command Line Option when executing VXQuery: + -hdfs-conf /path/to/hdfs/conf_folder + + +* 2. Running the Query + + + For files stored in HDFS there are 2 ways to access them from VXQuery. + + + [[a]] Reading them as whole files. + + + [[b]] Reading them block by block. + + +** a. Reading them as whole files. + + For this option you only need to change the path to files. To define that your + file(s) exist and should be read from HDFS you must add <"hdfs:/"> in front + of the path. VXQuery will read the path of the files you request in your query + and try to locate them. + + + So in order to run a query that will read the input files from HDFS you need + to make sure that + + + a) The environmental variable is set for "HADOOP_CONF_DIR" or you pass the + directory location using -hdfs-conf + + + b) The path defined in your query begins with <hdfs://> and the full path to + the file(s). + + + c) The path exists on HDFS and the user that runs the query has read permission + to these files. + + +*** Example + + I want to find all the <books> that are published after 2004. + + + The file is located in HDFS in this path </user/hduser/store/books.xml> + + + My query will look like this: + + +---------- +for $x in collection("hdfs://user/hduser/store") +where $x/year>2004 +return $x/title +---------- + + + If I want only one file, the <<books.xml>> to be parsed from HDFS, my query will + look like this: + + +---------- +for $x in doc("hdfs://user/hduser/store/books.xml") +where $x/year>2004 +return $x/title +---------- + + +** b. Reading them block by block + + + In order to use that option you need to modify your query. Instead of using the + <collection> or <doc> function to define your input file(s) you need to use + <collection-with-tag>. + + + <collection-with-tag> accepts two arguments, one is the path to the HDFS directory + you have stored your input files, and the second is a specific <<tag>> that exists + in the input file(s). This is the tag of the element that contains the fields that + your query is looking for. + + Other than these arguments, you do not need to change anything else in the query. + + Note: since this strategy is optimized to read block by block, the result will + include all elements with the given tag, regardless of depth within the xml tree. + + +*** Example + + The same example, using <<collection-with-tag>>. + + My input file <books.xml>: + +----------------------------- +<?xml version="1.0" encoding="UTF-8"?> +<bookstore> + +<book> + <title lang="en">Everyday Italian</title> + <author>Giada De Laurentiis</author> + <year>2005</year> + <price>30.00</price> +</book> + +<book> + <title lang="en">Harry Potter</title> + <author>J K. Rowling</author> + <year>2005</year> + <price>29.99</price> +</book> + +<book> + <title lang="en">XQuery Kick Start</title> + <author>James McGovern</author> + <author>Per Bothner</author> + <author>Kurt Cagle</author> + <author>James Linn</author> + <author>Vaidyanathan Nagarajan</author> + <year>2003</year> + <price>49.99</price> +</book> + +<book> + <title lang="en">Learning XML</title> + <author>Erik T. Ray</author> + <year>2003</year> + <price>39.95</price> +</book> + +</bookstore> +---------------------------- + + + My query will look like this: + + +---------------------------- +for $x in collection-with-tag("hdfs://user/hduser/store","book")/book +where $x/year>2004 +return $x/title +---------------------------- + + + Take notice that I defined the path to the directory containing the file(s) + and not the file, <collection-with-tag> expects the path to the directory. I also + added the </book> after the function. This is also needed, like <collection> and + <doc> functions, for the query to be parsed correctly. http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/src/site/site.xml ---------------------------------------------------------------------- diff --git a/src/site/site.xml b/src/site/site.xml index 5640976..d64fe0d 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -64,6 +64,9 @@ limitations under the License. name="Executing a Query" href="user_query.html" /> <item + name="Using HDFS with VXQuery" + href="user_query_hdfs.html" /> + <item name="Running the Test Suite" href="user_running_tests.html" /> </menu> http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-cli/pom.xml ---------------------------------------------------------------------- diff --git a/vxquery-cli/pom.xml b/vxquery-cli/pom.xml index 7bf1ff9..c4e11cb 100644 --- a/vxquery-cli/pom.xml +++ b/vxquery-cli/pom.xml @@ -126,7 +126,19 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-dataflow-std</artifactId> </dependency> + + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-hdfs-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-hdfs-2.x</artifactId> + </dependency> </dependencies> + + <reporting> <plugins> http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java ---------------------------------------------------------------------- diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java index a02c65d..17287c6 100644 --- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java +++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java @@ -81,7 +81,8 @@ public class VXQuery { /** * Constructor to use command line options passed. * - * @param opts Command line options object + * @param opts + * Command line options object */ public VXQuery(CmdLineOptions opts) { this.opts = opts; @@ -181,8 +182,13 @@ public class VXQuery { opts.showOET, opts.showRP); start = opts.timing ? new Date() : null; - XMLQueryCompiler compiler = new XMLQueryCompiler(listener, getNodeList(), opts.frameSize, - opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize); + + Map<String, NodeControllerInfo> nodeControllerInfos = null; + if (hcc != null) { + nodeControllerInfos = hcc.getNodeControllerInfos(); + } + XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize, + opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf); resultSetId = createResultSetId(); CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE), resultSetId, null); @@ -233,30 +239,13 @@ public class VXQuery { } /** - * Get cluster node configuration. - * - * @return Configuration of node controllers as array of Strings. - * @throws Exception - */ - private String[] getNodeList() throws Exception { - if (hcc != null) { - Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos(); - String[] nodeList = new String[nodeControllerInfos.size()]; - int index = 0; - for (String node : nodeControllerInfos.keySet()) { - nodeList[index++] = node; - } - return nodeList; - } - return new String[0]; - } - - /** * Creates a Hyracks dataset, if not already existing with the job frame size, and 1 reader. Allocates a new buffer of size specified in the frame of Hyracks * node. Creates new dataset reader with the current job ID and result set ID. Outputs the string in buffer for each frame. * - * @param spec JobSpecification object, containing frame size. Current specified job. - * @param writer Writer for output of job. + * @param spec + * JobSpecification object, containing frame size. Current specified job. + * @param writer + * Writer for output of job. * @throws Exception */ private void runJob(JobSpecification spec, PrintWriter writer) throws Exception { @@ -339,7 +328,8 @@ public class VXQuery { /** * Reads the contents of file given in query into a String. The file is always closed. For XML files UTF-8 encoding is used. * - * @param query The query with filename to be processed + * @param query + * The query with filename to be processed * @return UTF-8 formatted query string * @throws IOException */ @@ -361,8 +351,7 @@ public class VXQuery { * Helper class with fields and methods to handle all command line options */ private static class CmdLineOptions { - @Option(name = "-available-processors", - usage = "Number of available processors. (default: java's available processors)") + @Option(name = "-available-processors", usage = "Number of available processors. (default: java's available processors)") private int availableProcessors = -1; @Option(name = "-client-net-ip-address", usage = "IP Address of the ClusterController.") @@ -422,6 +411,9 @@ public class VXQuery { @Option(name = "-x", usage = "Bind an external variable") private Map<String, String> bindings = new HashMap<String, String>(); + @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files") + private String hdfsConf = null; + @Argument private List<String> arguments = new ArrayList<String>(); } http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/pom.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml index 59cc987..d244818 100644 --- a/vxquery-core/pom.xml +++ b/vxquery-core/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -63,7 +64,8 @@ <configuration> <target> <ant antfile="build-xslt.xml" target="build"> - <property name="build.gen-src.dir" value="${project.build.directory}/generated-sources/main/java" /> + <property name="build.gen-src.dir" + value="${project.build.directory}/generated-sources/main/java" /> <property name="src.code.dir" value="${basedir}/src/main/java" /> <property name="xslt.dir" value="${basedir}/src/main/xslt" /> <property name="classpath.xslt" refid="maven.compile.classpath" /> @@ -80,7 +82,8 @@ <configuration> <target> <ant antfile="build-site.xml" target="build"> - <property name="build.gen-site.dir" value="${project.build.directory}/generated-site/apt" /> + <property name="build.gen-site.dir" + value="${project.build.directory}/generated-site/apt" /> <property name="src.code.dir" value="${basedir}/src/main/java" /> <property name="xslt.dir" value="${basedir}/src/main/xslt" /> <property name="classpath.xslt" refid="maven.compile.classpath" /> @@ -121,16 +124,13 @@ <redirectTestOutputToFile>true</redirectTestOutputToFile> </configuration> </plugin> - <!-- - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-site-plugin</artifactId> - </plugin> - --> + <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-site-plugin</artifactId> + </plugin> --> </plugins> <pluginManagement> <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <!--This plugin's configuration is used to store Eclipse m2e settings + only. It has no influence on the Maven build itself. --> <plugin> <groupId>org.eclipse.m2e</groupId> <artifactId>lifecycle-mapping</artifactId> @@ -209,6 +209,16 @@ </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-hdfs-2.x</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-hdfs-core</artifactId> + </dependency> + + <dependency> <groupId>ant</groupId> <artifactId>ant-trax</artifactId> <scope>provided</scope> @@ -259,6 +269,21 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>2.7.0</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.7.0</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> </dependencies> <reporting> @@ -290,18 +315,10 @@ <dependencyLocationsEnabled>false</dependencyLocationsEnabled> </configuration> </plugin> - <!-- - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <version>2.5.2</version> - <configuration> - <effort>Min</effort> - <threshold>Normal</threshold> - <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile> - </configuration> - </plugin> - --> + <!-- <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId> + <version>2.5.2</version> <configuration> <effort>Min</effort> <threshold>Normal</threshold> + <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile> </configuration> + </plugin> --> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java index fd20465..205e0b2 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java @@ -19,8 +19,8 @@ package org.apache.vxquery.compiler.rewriter; import java.util.LinkedList; import java.util.List; -import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild; import org.apache.vxquery.compiler.rewriter.rules.ConsolidateAssignAggregateRule; +import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild; import org.apache.vxquery.compiler.rewriter.rules.ConvertAssignToUnnestRule; import org.apache.vxquery.compiler.rewriter.rules.ConvertFromAlgebricksExpressionsRule; import org.apache.vxquery.compiler.rewriter.rules.ConvertToAlgebricksExpressionsRule; http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java index 53011d2..74220da 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java @@ -22,16 +22,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.commons.lang3.mutable.Mutable; -import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; -import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.datamodel.values.ValueTag; -import org.apache.vxquery.functions.BuiltinFunctions; -import org.apache.vxquery.types.BuiltinTypeRegistry; -import org.apache.vxquery.types.Quantifier; -import org.apache.vxquery.types.SequenceType; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -46,6 +36,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; +import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; +import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.datamodel.values.ValueTag; +import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.types.BuiltinTypeRegistry; +import org.apache.vxquery.types.Quantifier; +import org.apache.vxquery.types.SequenceType; public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { final ByteBufferInputStream bbis = new ByteBufferInputStream(); @@ -54,14 +52,13 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); /** - * Get the constant value for the collection. Return null for not a collection. + * Get the arguments for the collection and collection-with-tag. Return null for not a collection. * * @param opRef * Logical operator * @return collection name */ - protected String getCollectionName(Mutable<ILogicalOperator> opRef) { - VXQueryConstantValue constantValue; + protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) { AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) { @@ -82,13 +79,32 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { return null; } AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; - if (!functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) { + if (!functionCall.getFunctionIdentifier() + .equals(BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier()) + && !functionCall.getFunctionIdentifier() + .equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) { return null; } - ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(0).getValue(); + // Get arguments + int size = functionCall.getArguments().size(); + if (size > 0) { + String args[] = new String[size]; + for (int i = 0; i < size; i++) { + args[i] = getArgument(functionCall, opRef, i); + } + return args; + } + return null; + } + + private String getArgument(AbstractFunctionCallExpression functionCall, Mutable<ILogicalOperator> opRef, int pos) { + VXQueryConstantValue constantValue; + ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(pos).getValue(); if (logicalExpression2.getExpressionTag() != LogicalExpressionTag.VARIABLE) { return null; + } else if (logicalExpression2 == null) { + return null; } VariableReferenceExpression vre = (VariableReferenceExpression) logicalExpression2; Mutable<ILogicalOperator> opRef3 = OperatorToolbox.findProducerOf(opRef, vre.getVariableReference()); @@ -111,7 +127,6 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { } else { return null; } - // Constant value is now in a TaggedValuePointable. Convert the value into a java String. tvp.set(constantValue.getValue(), 0, constantValue.getValue().length); String collectionName = null; @@ -121,11 +136,12 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { bbis.setByteBuffer(ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(), stringp.getStartOffset(), stringp.getLength() + stringp.getStartOffset())), 0); collectionName = di.readUTF(); + return collectionName; } catch (IOException e) { e.printStackTrace(); } } - return collectionName; + return null; } @Override http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java index cc857a1..8ed8bb1 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java @@ -64,9 +64,10 @@ public class IntroduceCollectionRule extends AbstractCollectionRule { @Override public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context; - String collectionName = getCollectionName(opRef); + String args[] = getCollectionName(opRef); - if (collectionName != null) { + if (args != null) { + String collectionName = args[0]; // Build the new operator and update the query plan. int collectionId = vxqueryContext.newCollectionId(); VXQueryCollectionDataSource ds = VXQueryCollectionDataSource.create(collectionId, collectionName, @@ -74,6 +75,12 @@ public class IntroduceCollectionRule extends AbstractCollectionRule { if (ds != null) { ds.setTotalDataSources(vxqueryContext.getTotalDataSources()); + // Check if the call is for collection-with-tag + if (args.length == 2) { + ds.setTotalDataSources(vxqueryContext.getTotalDataSources()); + ds.setTag(args[1]); + } + // Known to be true because of collection name. AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); UnnestOperator unnest = (UnnestOperator) op; http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml index 38f03a4..3b9371d 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml @@ -128,6 +128,14 @@ <!-- Collection operator is added during the rewrite rules phase. --> </function> + <!-- fn:collection-with-tag($arg1 as xs:string?, $arg2 as xs:string?) as node()* --> + <function name="fn:collection-with-tag"> + <param name="arg1" type="xs:string?"/> + <param name="arg2" type="xs:string?"/> + <return type="node()*"/> + <!-- CollectionWithTag operator is added during the rewrite rules phase. --> + </function> + <!-- fn:compare($comparand1 as xs:string?, $comparand2 as xs:string?) as xs:integer? --> <function name="fn:compare"> <param name="comparand1" type="xs:string?"/> http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java new file mode 100644 index 0000000..dcbfe94 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.hdfs2; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringReader; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.hdfs.ContextFactory; +import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory; +import org.w3c.dom.Document; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +public class HDFSFunctions { + + private Configuration conf; + private FileSystem fs; + private String conf_path; + private Job job; + private InputFormat inputFormat; + private List<InputSplit> splits; + private ArrayList<ArrayList<String>> nodes; + private HashMap<Integer, String> schedule; + private final String TEMP = "java.io.tmpdir"; + private final String dfs_path = "vxquery_splits_schedule.txt"; + private final String filepath = System.getProperty(TEMP) + "splits_schedule.txt"; + protected static final Logger LOGGER = Logger.getLogger(HDFSFunctions.class.getName()); + private final Map<String, NodeControllerInfo> nodeControllerInfos; + + /** + * Create the configuration and add the paths for core-site and hdfs-site as resources. + * Initialize an instance of HDFS FileSystem for this configuration. + * + * @param nodeControllerInfos + * @param hdfsConf + */ + public HDFSFunctions(Map<String, NodeControllerInfo> nodeControllerInfos, String hdfsConf) { + this.conf = new Configuration(); + this.nodeControllerInfos = nodeControllerInfos; + this.conf_path = hdfsConf; + } + + /** + * Create the needed objects for reading the splits of the filepath given as argument. + * This method should run before the scheduleSplits method. + * + * @param filepath + */ + @SuppressWarnings({ "deprecation", "unchecked" }) + public void setJob(String filepath, String tag) { + try { + conf.set("start_tag", "<" + tag + ">"); + conf.set("end_tag", "</" + tag + ">"); + job = new Job(conf, "Read from HDFS"); + Path input = new Path(filepath); + FileInputFormat.addInputPath(job, input); + job.setInputFormatClass(XmlCollectionWithTagInputFormat.class); + inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration()); + splits = inputFormat.getSplits(job); + } catch (IOException | ClassNotFoundException | InterruptedException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } + + /** + * Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function. + * Searches in subdirectories of the home directory too. + * + * @param filename + * @return + * @throws IOException + * @throws IllegalArgumentException + */ + public boolean isLocatedInHDFS(String filename) throws IllegalArgumentException, IOException { + //search file path + if (fs.exists(new Path(filename))) { + return true; + } + return searchInDirectory(fs.getHomeDirectory(), filename) != null; + } + + /** + * Searches the given directory for the file. + * + * @param directory + * to search + * @param filename + * of file we want + * @return path if file exists in this directory.else return null. + */ + public Path searchInDirectory(Path directory, String filename) { + //Search the files and folder in this Path to find the one matching the filename. + try { + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + String[] parts; + Path path; + while (it.hasNext()) { + path = it.next().getPath(); + parts = path.toString().split("/"); + if (parts[parts.length - 1].equals(filename)) { + return path; + } + } + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + return null; + } + + /** + * Read the cluster properties file and locate the HDFS_CONF variable that is the directory path for the + * hdfs configuration if the system environment variable HDFS_CONF is not set. + * + * @return true if is successfully finds the Hadoop/HDFS home directory + */ + private boolean locateConf() { + if (this.conf_path == null) { + //As a last resort, try getting the configuration from the system environment + //Some systems won't have this set. + this.conf_path = System.getenv("HADOOP_CONF_DIR"); + } + return this.conf_path != null; + } + + /** + * Upload a file/directory to HDFS.Filepath is the path in the local file system.dir is the destination path. + * + * @param filepath + * @param dir + * @return + */ + public boolean put(String filepath, String dir) { + if (this.fs != null) { + Path path = new Path(filepath); + Path dest = new Path(dir); + try { + if (fs.exists(dest)) { + fs.delete(dest, true); //recursive delete + } + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + try { + fs.copyFromLocalFile(path, dest); + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } + return false; + } + + /** + * Get instance of the HDFSfile system if it is configured correctly. + * Return null if there is no instance. + * + * @return + */ + public FileSystem getFileSystem() { + if (locateConf()) { + conf.addResource(new Path(this.conf_path + "/core-site.xml")); + conf.addResource(new Path(this.conf_path + "/hdfs-site.xml")); + try { + fs = FileSystem.get(conf); + return this.fs; + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } else { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe("Could not locate HDFS configuration folder."); + } + } + return null; + } + + /** + * Create a HashMap that has as key the hostname and values the splits that belong to this hostname; + * + * @return + * @throws IOException + */ + public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws IOException { + HashMap<String, ArrayList<Integer>> splits_map = new HashMap<String, ArrayList<Integer>>(); + ArrayList<Integer> temp; + int i = 0; + String hostname; + for (InputSplit s : this.splits) { + SplitLocationInfo info[] = s.getLocationInfo(); + hostname = info[0].getLocation(); + if (splits_map.containsKey(hostname)) { + temp = splits_map.get(hostname); + temp.add(i); + } else { + temp = new ArrayList<Integer>(); + temp.add(i); + splits_map.put(hostname, temp); + } + i++; + } + + return splits_map; + } + + public void scheduleSplits() throws IOException, ParserConfigurationException, SAXException { + schedule = new HashMap<Integer, String>(); + ArrayList<String> empty = new ArrayList<String>(); + HashMap<String, ArrayList<Integer>> splits_map = this.getLocationsOfSplits(); + readNodesFromXML(); + int count = this.splits.size(); + + ArrayList<Integer> splits; + String node; + for (ArrayList<String> info : this.nodes) { + node = info.get(1); + if (splits_map.containsKey(node)) { + splits = splits_map.get(node); + for (Integer split : splits) { + schedule.put(split, node); + count--; + } + splits_map.remove(node); + } else { + empty.add(node); + } + } + + //Check if every split got assigned to a node + if (count != 0) { + ArrayList<Integer> remaining = new ArrayList<Integer>(); + // Find remaining splits + for (InputSplit s : this.splits) { + int i = 0; + if (!schedule.containsKey(i)) { + remaining.add(i); + } + } + + if (empty.size() != 0) { + int node_number = 0; + for (int split : remaining) { + if (node_number == empty.size()) { + node_number = 0; + } + schedule.put(split, empty.get(node_number)); + node_number++; + } + } + } + } + + /** + * Read the hostname and the ip address of every node from the xml cluster configuration file. + * Save the information inside nodes. + * + * @throws ParserConfigurationException + * @throws IOException + * @throws SAXException + */ + public void readNodesFromXML() throws ParserConfigurationException, SAXException, IOException { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder; + dBuilder = dbFactory.newDocumentBuilder(); + nodes = new ArrayList<ArrayList<String>>(); + for (NodeControllerInfo ncInfo : nodeControllerInfos.values()) { + //Will this include the master node? Is that bad? + ArrayList<String> info = new ArrayList<String>(); + info.add(ncInfo.getNodeId()); + info.add(ncInfo.getNetworkAddress().getAddress()); + nodes.add(info); + } + } + + /** + * Writes the schedule to a temporary file, then uploads the file to the HDFS. + * + * @throws UnsupportedEncodingException + * @throws FileNotFoundException + */ + public void addScheduleToDistributedCache() throws FileNotFoundException, UnsupportedEncodingException { + PrintWriter writer; + writer = new PrintWriter(filepath, "UTF-8"); + for (int split : this.schedule.keySet()) { + writer.write(split + "," + this.schedule.get(split)); + } + writer.close(); + // Add file to HDFS + this.put(filepath, dfs_path); + } + + public RecordReader getReader() { + + List<FileSplit> fileSplits = new ArrayList<FileSplit>(); + for (int i = 0; i < splits.size(); i++) { + fileSplits.add((FileSplit) splits.get(i)); + } + FileSplitsFactory splitsFactory; + try { + splitsFactory = new FileSplitsFactory(fileSplits); + List<FileSplit> inputSplits = splitsFactory.getSplits(); + ContextFactory ctxFactory = new ContextFactory(); + int size = inputSplits.size(); + for (int i = 0; i < size; i++) { + /** + * read the split + */ + TaskAttemptContext context; + try { + context = ctxFactory.createContext(job.getConfiguration(), i); + RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context); + reader.initialize(inputSplits.get(i), context); + return reader; + } catch (IOException | InterruptedException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } + } catch (HyracksDataException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + return null; + } + + /** + * @return schedule. + */ + public HashMap<Integer, String> getSchedule() { + return this.schedule; + } + + /** + * Return the splits belonging to this node for the existing schedule. + * + * @param node + * @return + */ + public ArrayList<Integer> getScheduleForNode(String node) { + ArrayList<Integer> node_schedule = new ArrayList<Integer>(); + for (int split : this.schedule.keySet()) { + if (node.equals(this.schedule.get(split))) { + node_schedule.add(split); + } + } + return node_schedule; + } + + public List<InputSplit> getSplits() { + return this.splits; + } + + public Job getJob() { + return this.job; + } + + public InputFormat getinputFormat() { + return this.inputFormat; + } + + public Document convertStringToDocument(String xmlStr) { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder; + try { + builder = factory.newDocumentBuilder(); + Document doc = builder.parse(new InputSource(new StringReader(xmlStr))); + return doc; + } catch (Exception e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java new file mode 100644 index 0000000..1d053b6 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.hdfs2; + +import com.google.common.io.Closeables; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import java.io.IOException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; + +/** + * Reads records that are delimited by a specific begin/end tag. + */ +public class XmlCollectionWithTagInputFormat extends TextInputFormat { + + public static String STARTING_TAG; + public static String ENDING_TAG; + + @Override + public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { + try { + STARTING_TAG = context.getConfiguration().get("start_tag"); + ENDING_TAG = context.getConfiguration().get("end_tag"); + return new XmlRecordReader((FileSplit) split, context.getConfiguration()); + } catch (IOException ioe) { + return null; + } + } + + /** + * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified + * by the end tag + */ + public static class XmlRecordReader extends RecordReader<LongWritable, Text> { + + private final byte[] end_tag; + private final byte[] start_tag; + private final long start; + private final long end; + private final FSDataInputStream fsin; + private final DataOutputBuffer buffer = new DataOutputBuffer(); + private LongWritable currentKey; + private Text currentValue; + BlockLocation[] blocks; + public static byte[] nl = "\n".getBytes(); + + public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { + end_tag = ENDING_TAG.getBytes(Charsets.UTF_8); + start_tag = STARTING_TAG.getBytes(Charsets.UTF_8); + + // open the file and seek to the start of the split + start = split.getStart(); + // set the end of the file + end = start + split.getLength(); + Path file = split.getPath(); + FileSystem fs = file.getFileSystem(conf); + FileStatus fStatus = fs.getFileStatus(file); + blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen()); + // seek the start of file + fsin = fs.open(split.getPath()); + fsin.seek(start); + } + + /** + * Get next block item + * + * @param key + * @param value + * @return + * @throws IOException + */ + private boolean next(LongWritable key, Text value) throws IOException { + if (fsin.getPos() < end) { + try { + if (readBlock(true)) { + key.set(fsin.getPos()); + value.set(buffer.getData(), 0, buffer.getLength()); + return true; + } + } finally { + buffer.reset(); + } + } + return false; + } + + @Override + public void close() throws IOException { + Closeables.close(fsin, true); + } + + @Override + public float getProgress() throws IOException { + return (fsin.getPos() - start) / (float) (end - start); + } + + /** + * Read the block from start till end and after that until you find a closing tag + * + * @param withinBlock + * @return + * @throws IOException + */ + private boolean readBlock(boolean withinBlock) throws IOException { + boolean read = false; + + while (true) { + if (fsin.getPos() < end) { + if (readUntilMatch(start_tag, false)) { + buffer.write(start_tag); + readUntilMatch(end_tag, true); + read = true; + } + } else { + return read; + } + } + } + + /** + * Read from block(s) until you reach the end of file or find a matching bytes with match[] + * + * @param match + * @param withinBlock + * @return + * @throws IOException + */ + private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { + int i = 0; + while (true) { + int b = fsin.read(); + // end of file: + if (b == -1) { + return false; + } + // save to buffer: + if (withinBlock) { + buffer.write(b); + } + + // check if we're matching: + if (b == match[i]) { + i++; + if (i >= match.length) { + return true; + } + } else { + i = 0; + } + // see if we've passed the stop point: + if (!withinBlock && i == 0 && fsin.getPos() >= end) { + return false; + } + } + } + + private int nextBlock() throws IOException { + long pos = fsin.getPos(); + long block_length; + for (int i = 0; i < blocks.length; i++) { + block_length = blocks[i].getOffset() + blocks[i].getLength(); + if (pos == block_length) { + return i + 1; + } + } + return 0; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + currentKey = new LongWritable(); + currentValue = new Text(); + return next(currentKey, currentValue); + } + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index 62d1ca7..3c2d6aa 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -37,6 +37,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { private String[] collectionPartitions; private final List<Integer> childSeq; private int totalDataSources; + private String tag; private final Object[] types; @@ -60,6 +61,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { } }; this.childSeq = new ArrayList<Integer>(); + this.tag = null; } public int getTotalDataSources() { @@ -77,7 +79,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { public String[] getPartitions() { return collectionPartitions; } - + public void setPartitions(String[] collectionPartitions) { this.collectionPartitions = collectionPartitions; } @@ -86,6 +88,14 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { return collectionPartitions.length; } + public String getTag() { + return this.tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + @Override public String getId() { return collectionName; http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index d5966b8..b8dca63 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -16,15 +16,37 @@ */ package org.apache.vxquery.metadata; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.xml.parsers.ParserConfigurationException; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameFieldAppender; import org.apache.hyracks.api.comm.VSizeFrame; @@ -38,10 +60,14 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.hdfs.ContextFactory; +import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory; import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; import org.apache.vxquery.xmlparser.TreeNodeIdProvider; import org.apache.vxquery.xmlparser.XMLParser; +import org.xml.sax.SAXException; public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -50,15 +76,23 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private String[] collectionPartitions; private List<Integer> childSeq; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); + private HDFSFunctions hdfs; + private String tag; + private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n"; + private final String hdfsConf; + private final Map<String, NodeControllerInfo> nodeControllerInfos; public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, - RecordDescriptor rDesc) { + RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { super(spec, 1, 1); collectionPartitions = ds.getPartitions(); dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); recordDescriptors[0] = rDesc; + this.tag = ds.getTag(); + this.hdfsConf = hdfsConf; + this.nodeControllerInfos = nodeControllerInfos; } @Override @@ -83,31 +117,155 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void open() throws HyracksDataException { appender.reset(frame, true); writer.open(); + hdfs = new HDFSFunctions(nodeControllerInfos, hdfsConf); } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { fta.reset(buffer); String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); - File collectionDirectory = new File(collectionModifiedName); - - // Go through each tuple. - if (collectionDirectory.isDirectory()) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - @SuppressWarnings("unchecked") - Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), - TrueFileFilter.INSTANCE); - while (it.hasNext()) { - File xmlDocument = it.next(); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); + if (!collectionModifiedName.contains("hdfs:/")) { + File collectionDirectory = new File(collectionModifiedName); + //check if directory is in the local file system + if (collectionDirectory.exists()) { + // Go through each tuple. + if (collectionDirectory.isDirectory()) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, + new VXQueryIOFileFilter(), TrueFileFilter.INSTANCE); + while (it.hasNext()) { + File xmlDocument = it.next(); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); + } + parser.parseElements(xmlDocument, writer, tupleIndex); + } } - parser.parseElements(xmlDocument, writer, tupleIndex); + } else { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); } } } else { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); + // Else check in HDFS file system + // Get instance of the HDFS filesystem + FileSystem fs = hdfs.getFileSystem(); + if (fs != null) { + collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", ""); + Path directory = new Path(collectionModifiedName); + Path xmlDocument; + if (tag != null) { + hdfs.setJob(directory.toString(), tag); + tag = "<" + tag + ">"; + Job job = hdfs.getJob(); + InputFormat inputFormat = hdfs.getinputFormat(); + try { + hdfs.scheduleSplits(); + ArrayList<Integer> schedule = hdfs + .getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); + List<InputSplit> splits = hdfs.getSplits(); + List<FileSplit> fileSplits = new ArrayList<FileSplit>(); + for (int i : schedule) { + fileSplits.add((FileSplit) splits.get(i)); + } + FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits); + List<FileSplit> inputSplits = splitsFactory.getSplits(); + ContextFactory ctxFactory = new ContextFactory(); + int size = inputSplits.size(); + InputStream stream; + String value; + RecordReader reader; + TaskAttemptContext context; + for (int i = 0; i < size; i++) { + //read split + context = ctxFactory.createContext(job.getConfiguration(), i); + try { + reader = inputFormat.createRecordReader(inputSplits.get(i), context); + reader.initialize(inputSplits.get(i), context); + while (reader.nextKeyValue()) { + value = reader.getCurrentValue().toString(); + //Split value if it contains more than one item with the tag + if (StringUtils.countMatches(value, tag) > 1) { + String items[] = value.split(tag); + for (String item : items) { + if (item.length() > 0) { + item = START_TAG + tag + item; + stream = new ByteArrayInputStream( + item.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + } + } + } else { + value = START_TAG + value; + //create an input stream to the file currently reading and send it to parser + stream = new ByteArrayInputStream( + value.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + } + } + + } catch (InterruptedException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } + + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } catch (ParserConfigurationException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } catch (SAXException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } else { + try { + //check if the path exists and is a directory + if (fs.exists(directory) && fs.isDirectory(directory)) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + //read every file in the directory + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + while (it.hasNext()) { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine( + "Starting to read XML document: " + xmlDocument.getName()); + } + //create an input stream to the file currently reading and send it to parser + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSElements(in, writer, fta, tupleIndex); + } + } + } + } else { + throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" + + directory + ") passed to collection."); + } + } catch (FileNotFoundException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } + try { + fs.close(); + } catch (IOException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(e.getMessage()); + } + } + } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/3fc2d6c2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java index 238f6d3..820c365 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.vxquery.context.StaticContext; - import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -45,6 +43,7 @@ import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; +import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -52,16 +51,22 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; +import org.apache.vxquery.context.StaticContext; public class VXQueryMetadataProvider implements IMetadataProvider<String, String> { private final String[] nodeList; private final Map<String, File> sourceFileMap; private final StaticContext staticCtx; + private final String hdfsConf; + private final Map<String, NodeControllerInfo> nodeControllerInfos; - public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx) { + public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx, + String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { this.nodeList = nodeList; this.sourceFileMap = sourceFileMap; this.staticCtx = staticCtx; + this.hdfsConf = hdfsConf; + this.nodeControllerInfos = nodeControllerInfos; } @Override @@ -82,7 +87,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) - throws AlgebricksException { + throws AlgebricksException { VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource; if (sourceFileMap != null) { final int len = ds.getPartitions().length; @@ -95,7 +100,8 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String ds.setPartitions(collectionPartitions); } RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); - IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc); + IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf, + this.nodeControllerInfos); AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount()); return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint); @@ -129,7 +135,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String @Override public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) - throws AlgebricksException { + throws AlgebricksException { throw new UnsupportedOperationException(); } @@ -155,7 +161,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) - throws AlgebricksException { + throws AlgebricksException { throw new UnsupportedOperationException(); }
