add hadoop-conf as variable, changed collectionwithtag to collection-with-tag
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/4bd85a17 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/4bd85a17 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/4bd85a17 Branch: refs/heads/steven/hdfs Commit: 4bd85a17ad610a8f5f1195bc7bc9e3e56d7b4acb Parents: c85e556 Author: efikalti <[email protected]> Authored: Fri Feb 26 14:16:48 2016 +0200 Committer: efikalti <[email protected]> Committed: Fri Feb 26 14:16:48 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/vxquery/cli/VXQuery.java | 739 ++++++++++--------- .../rewriter/rules/AbstractCollectionRule.java | 2 +- .../vxquery/functions/builtin-functions.xml | 2 +- .../VXQueryCollectionOperatorDescriptor.java | 5 +- .../metadata/VXQueryMetadataProvider.java | 7 +- .../xmlquery/query/XMLQueryCompiler.java | 10 +- .../XQuery/HDFS/Aggregate/maxvalueHDFS.xq | 6 +- 7 files changed, 398 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java ---------------------------------------------------------------------- diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java index 0e88539..db95468 100644 --- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java +++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java @@ -63,366 +63,383 @@ import edu.uci.ics.hyracks.control.nc.NodeControllerService; import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; public class VXQuery { - private final CmdLineOptions opts; - - private ClusterControllerService cc; - private NodeControllerService[] ncs; - private IHyracksClientConnection hcc; - private IHyracksDataset hds; - - private ResultSetId resultSetId; - private static List<String> timingMessages = new ArrayList<String>(); - private static long sumTiming; - private static long sumSquaredTiming; - private static long minTiming = Long.MAX_VALUE; - private static long maxTiming = Long.MIN_VALUE; - - /** - * Constructor to use command line options passed. - * - * @param opts - * Command line options object - */ - public VXQuery(CmdLineOptions opts) { - this.opts = opts; - } - - /** - * Main method to get command line options and execute query process. - * - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - Date start = new Date(); - final CmdLineOptions opts = new CmdLineOptions(); - CmdLineParser parser = new CmdLineParser(opts); - - // parse command line options, give error message if no arguments passed - try { - parser.parseArgument(args); - } catch (Exception e) { - parser.printUsage(System.err); - return; - } - if (opts.arguments.isEmpty()) { - parser.printUsage(System.err); - return; - } - VXQuery vxq = new VXQuery(opts); - vxq.execute(); - // if -timing argument passed, show the starting and ending times - if (opts.timing) { - Date end = new Date(); - timingMessage("Execution time: " + (end.getTime() - start.getTime()) + " ms"); - if (opts.repeatExec > opts.timingIgnoreQueries) { - long mean = sumTiming / (opts.repeatExec - opts.timingIgnoreQueries); - double sd = Math - .sqrt(sumSquaredTiming / (opts.repeatExec - new Integer(opts.timingIgnoreQueries).doubleValue()) - - mean * mean); - timingMessage("Average execution time: " + mean + " ms"); - timingMessage("Standard deviation: " + String.format("%.4f", sd)); - timingMessage("Coefficient of variation: " + String.format("%.4f", (sd / mean))); - timingMessage("Minimum execution time: " + minTiming + " ms"); - timingMessage("Maximum execution time: " + maxTiming + " ms"); - } - System.out.println("Timing Summary:"); - for (String time : timingMessages) { - System.out.println(" " + time); - } - } - - } - - /** - * Creates a new Hyracks connection with: the client IP address and port provided, if IP address is provided in command line. Otherwise create a new virtual - * cluster with Hyracks nodes. Queries passed are run either way. After running queries, if a virtual cluster has been created, it is shut down. - * - * @throws Exception - */ - private void execute() throws Exception { - System.setProperty("vxquery.buffer_size", Integer.toString(opts.bufferSize)); - - if (opts.clientNetIpAddress != null) { - hcc = new HyracksConnection(opts.clientNetIpAddress, opts.clientNetPort); - runQueries(); - } else { - if (!opts.compileOnly) { - startLocalHyracks(); - } - try { - runQueries(); - } finally { - if (!opts.compileOnly) { - stopLocalHyracks(); - } - } - } - } - - /** - * Reads the contents of the files passed in the list of arguments to a string. If -showquery argument is passed, output the query as string. Run the query - * for the string. - * - * @throws IOException - * @throws SystemException - * @throws Exception - */ - private void runQueries() throws IOException, SystemException, Exception { - Date start = null; - Date end = null; - for (String query : opts.arguments) { - String qStr = slurp(query); - if (opts.showQuery) { - System.err.println(qStr); - } - - VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET, - opts.showOET, opts.showRP); - - start = opts.timing ? new Date() : null; - XMLQueryCompiler compiler = new XMLQueryCompiler(listener, getNodeList(), opts.frameSize, - opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize); - resultSetId = createResultSetId(); - CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE), - resultSetId, null); - compiler.compile(query, new StringReader(qStr), ccb, opts.optimizationLevel); - // if -timing argument passed, show the starting and ending times - if (opts.timing) { - end = new Date(); - timingMessage("Compile time: " + (end.getTime() - start.getTime()) + " ms"); - } - if (opts.compileOnly) { - continue; - } - - Module module = compiler.getModule(); - JobSpecification js = module.getHyracksJobSpecification(); - - DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext()); - js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory())); - - OutputStream resultStream = System.out; - if (opts.resultFile != null) { - resultStream = new FileOutputStream(new File(opts.resultFile)); - } - - PrintWriter writer = new PrintWriter(resultStream, true); - // Repeat execution for number of times provided in -repeatexec argument - for (int i = 0; i < opts.repeatExec; ++i) { - start = opts.timing ? new Date() : null; - runJob(js, writer); - // if -timing argument passed, show the starting and ending times - if (opts.timing) { - end = new Date(); - long currentRun = end.getTime() - start.getTime(); - if ((i + 1) > opts.timingIgnoreQueries) { - sumTiming += currentRun; - sumSquaredTiming += currentRun * currentRun; - if (currentRun < minTiming) { - minTiming = currentRun; - } - if (maxTiming < currentRun) { - maxTiming = currentRun; - } - } - timingMessage("Job (" + (i + 1) + ") execution time: " + currentRun + " ms"); - } - } - } - } - - /** - * Get cluster node configuration. - * - * @return Configuration of node controllers as array of Strings. - * @throws Exception - */ - private String[] getNodeList() throws Exception { - Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos(); - String[] nodeList = new String[nodeControllerInfos.size()]; - int index = 0; - for (String node : nodeControllerInfos.keySet()) { - nodeList[index++] = node; - } - return nodeList; - } - - /** - * Creates a Hyracks dataset, if not already existing with the job frame size, and 1 reader. Allocates a new buffer of size specified in the frame of Hyracks - * node. Creates new dataset reader with the current job ID and result set ID. Outputs the string in buffer for each frame. - * - * @param spec - * JobSpecification object, containing frame size. Current specified job. - * @param writer - * Writer for output of job. - * @throws Exception - */ - private void runJob(JobSpecification spec, PrintWriter writer) throws Exception { - int nReaders = 1; - if (hds == null) { - hds = new HyracksDataset(hcc, spec.getFrameSize(), nReaders); - } - - JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); - - ByteBuffer buffer = ByteBuffer.allocate(spec.getFrameSize()); - IHyracksDatasetReader reader = hds.createReader(jobId, resultSetId); - IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize()); - buffer.clear(); - - while (reader.read(buffer) > 0) { - buffer.clear(); - writer.print(ResultUtils.getStringFromBuffer(buffer, frameTupleAccessor)); - writer.flush(); - } - - hcc.waitForCompletion(jobId); - } - - /** - * Create a unique result set id to get the correct query back from the cluster. - * - * @return Result Set id generated with current system time. - */ - protected ResultSetId createResultSetId() { - return new ResultSetId(System.nanoTime()); - } - - /** - * Start local virtual cluster with cluster controller node and node controller nodes. IP address provided for node controller is localhost. Unassigned ports - * 39000 and 39001 are used for client and cluster port respectively. Creates a new Hyracks connection with the IP address and client ports. - * - * @throws Exception - */ - public void startLocalHyracks() throws Exception { - CCConfig ccConfig = new CCConfig(); - ccConfig.clientNetIpAddress = "127.0.0.1"; - ccConfig.clientNetPort = 39000; - ccConfig.clusterNetIpAddress = "127.0.0.1"; - ccConfig.clusterNetPort = 39001; - ccConfig.httpPort = 39002; - ccConfig.profileDumpPeriod = 10000; - cc = new ClusterControllerService(ccConfig); - cc.start(); - - ncs = new NodeControllerService[opts.localNodeControllers]; - for (int i = 0; i < ncs.length; i++) { - NCConfig ncConfig = new NCConfig(); - ncConfig.ccHost = "localhost"; - ncConfig.ccPort = 39001; - ncConfig.clusterNetIPAddress = "127.0.0.1"; - ncConfig.dataIPAddress = "127.0.0.1"; - ncConfig.resultIPAddress = "127.0.0.1"; - ncConfig.nodeId = "nc" + (i + 1); - ncConfig.ioDevices = Files.createTempDirectory(ncConfig.nodeId).toString(); - ncs[i] = new NodeControllerService(ncConfig); - ncs[i].start(); - } - - hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); - } - - /** - * Shuts down the virtual cluster, along with all nodes and node execution, network and queue managers. - * - * @throws Exception - */ - public void stopLocalHyracks() throws Exception { - for (int i = 0; i < ncs.length; i++) { - ncs[i].stop(); - } - cc.stop(); - } - - /** - * Reads the contents of file given in query into a String. The file is always closed. For XML files UTF-8 encoding is used. - * - * @param query - * The query with filename to be processed - * @return UTF-8 formatted query string - * @throws IOException - */ - private static String slurp(String query) throws IOException { - return FileUtils.readFileToString(new File(query), "UTF-8"); - } - - /** - * Save and print out the timing message. - * - * @param message - */ - private static void timingMessage(String message) { - System.out.println(message); - timingMessages.add(message); - } - - /** - * Helper class with fields and methods to handle all command line options - */ - private static class CmdLineOptions { - @Option(name = "-available-processors", usage = "Number of available processors. (default: java's available processors)") - private int availableProcessors = -1; - - @Option(name = "-client-net-ip-address", usage = "IP Address of the ClusterController.") - private String clientNetIpAddress = null; - - @Option(name = "-client-net-port", usage = "Port of the ClusterController. (default: 1098)") - private int clientNetPort = 1098; - - @Option(name = "-local-node-controllers", usage = "Number of local node controllers. (default: 1)") - private int localNodeControllers = 1; - - @Option(name = "-frame-size", usage = "Frame size in bytes. (default: 65,536)") - private int frameSize = 65536; - - @Option(name = "-join-hash-size", usage = "Join hash size in bytes. (default: 67,108,864)") - private long joinHashSize = -1; - - @Option(name = "-maximum-data-size", usage = "Maximum possible data size in bytes. (default: 150,323,855,000)") - private long maximumDataSize = -1; - - @Option(name = "-buffer-size", usage = "Disk read buffer size in bytes.") - private int bufferSize = -1; - - @Option(name = "-O", usage = "Optimization Level. (default: Full Optimization)") - private int optimizationLevel = Integer.MAX_VALUE; - - @Option(name = "-showquery", usage = "Show query string.") - private boolean showQuery; - - @Option(name = "-showast", usage = "Show abstract syntax tree.") - private boolean showAST; - - @Option(name = "-showtet", usage = "Show translated expression tree.") - private boolean showTET; - - @Option(name = "-showoet", usage = "Show optimized expression tree.") - private boolean showOET; - - @Option(name = "-showrp", usage = "Show Runtime plan.") - private boolean showRP; - - @Option(name = "-compileonly", usage = "Compile the query and stop.") - private boolean compileOnly; - - @Option(name = "-repeatexec", usage = "Number of times to repeat execution.") - private int repeatExec = 1; - - @Option(name = "-result-file", usage = "File path to save the query result.") - private String resultFile = null; - - @Option(name = "-timing", usage = "Produce timing information.") - private boolean timing; - - @Option(name = "-timing-ignore-queries", usage = "Ignore the first X number of quereies.") - private int timingIgnoreQueries = 2; - - @Option(name = "-x", usage = "Bind an external variable") - private Map<String, String> bindings = new HashMap<String, String>(); - - @Argument - private List<String> arguments = new ArrayList<String>(); - } + private final CmdLineOptions opts; + + private ClusterControllerService cc; + private NodeControllerService[] ncs; + private IHyracksClientConnection hcc; + private IHyracksDataset hds; + + private ResultSetId resultSetId; + private static List<String> timingMessages = new ArrayList<String>(); + private static long sumTiming; + private static long sumSquaredTiming; + private static long minTiming = Long.MAX_VALUE; + private static long maxTiming = Long.MIN_VALUE; + + /** + * Constructor to use command line options passed. + * + * @param opts + * Command line options object + */ + public VXQuery(CmdLineOptions opts) { + this.opts = opts; + } + + /** + * Main method to get command line options and execute query process. + * + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + Date start = new Date(); + final CmdLineOptions opts = new CmdLineOptions(); + CmdLineParser parser = new CmdLineParser(opts); + + // parse command line options, give error message if no arguments passed + try { + parser.parseArgument(args); + } catch (Exception e) { + parser.printUsage(System.err); + return; + } + if (opts.arguments.isEmpty()) { + parser.printUsage(System.err); + return; + } + VXQuery vxq = new VXQuery(opts); + vxq.execute(); + // if -timing argument passed, show the starting and ending times + if (opts.timing) { + Date end = new Date(); + timingMessage("Execution time: " + (end.getTime() - start.getTime()) + " ms"); + if (opts.repeatExec > opts.timingIgnoreQueries) { + long mean = sumTiming / (opts.repeatExec - opts.timingIgnoreQueries); + double sd = Math + .sqrt(sumSquaredTiming / (opts.repeatExec - new Integer(opts.timingIgnoreQueries).doubleValue()) + - mean * mean); + timingMessage("Average execution time: " + mean + " ms"); + timingMessage("Standard deviation: " + String.format("%.4f", sd)); + timingMessage("Coefficient of variation: " + String.format("%.4f", (sd / mean))); + timingMessage("Minimum execution time: " + minTiming + " ms"); + timingMessage("Maximum execution time: " + maxTiming + " ms"); + } + System.out.println("Timing Summary:"); + for (String time : timingMessages) { + System.out.println(" " + time); + } + } + + } + + /** + * Creates a new Hyracks connection with: the client IP address and port + * provided, if IP address is provided in command line. Otherwise create a + * new virtual cluster with Hyracks nodes. Queries passed are run either + * way. After running queries, if a virtual cluster has been created, it is + * shut down. + * + * @throws Exception + */ + private void execute() throws Exception { + System.setProperty("vxquery.buffer_size", Integer.toString(opts.bufferSize)); + + if (opts.clientNetIpAddress != null) { + hcc = new HyracksConnection(opts.clientNetIpAddress, opts.clientNetPort); + runQueries(); + } else { + if (!opts.compileOnly) { + startLocalHyracks(); + } + try { + runQueries(); + } finally { + if (!opts.compileOnly) { + stopLocalHyracks(); + } + } + } + } + + /** + * Reads the contents of the files passed in the list of arguments to a + * string. If -showquery argument is passed, output the query as string. Run + * the query for the string. + * + * @throws IOException + * @throws SystemException + * @throws Exception + */ + private void runQueries() throws IOException, SystemException, Exception { + Date start = null; + Date end = null; + for (String query : opts.arguments) { + String qStr = slurp(query); + if (opts.showQuery) { + System.err.println(qStr); + } + + VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET, + opts.showOET, opts.showRP); + start = opts.timing ? new Date() : null; + XMLQueryCompiler compiler = new XMLQueryCompiler(listener, getNodeList(), opts.frameSize, + opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf); + resultSetId = createResultSetId(); + CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE), + resultSetId, null); + compiler.compile(query, new StringReader(qStr), ccb, opts.optimizationLevel); + // if -timing argument passed, show the starting and ending times + if (opts.timing) { + end = new Date(); + timingMessage("Compile time: " + (end.getTime() - start.getTime()) + " ms"); + } + if (opts.compileOnly) { + continue; + } + + Module module = compiler.getModule(); + JobSpecification js = module.getHyracksJobSpecification(); + js.setProperty("hdfsConf", opts.hdfsConf); + + DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext()); + js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory())); + OutputStream resultStream = System.out; + if (opts.resultFile != null) { + resultStream = new FileOutputStream(new File(opts.resultFile)); + } + + PrintWriter writer = new PrintWriter(resultStream, true); + // Repeat execution for number of times provided in -repeatexec + // argument + for (int i = 0; i < opts.repeatExec; ++i) { + start = opts.timing ? new Date() : null; + runJob(js, writer); + // if -timing argument passed, show the starting and ending + // times + if (opts.timing) { + end = new Date(); + long currentRun = end.getTime() - start.getTime(); + if ((i + 1) > opts.timingIgnoreQueries) { + sumTiming += currentRun; + sumSquaredTiming += currentRun * currentRun; + if (currentRun < minTiming) { + minTiming = currentRun; + } + if (maxTiming < currentRun) { + maxTiming = currentRun; + } + } + timingMessage("Job (" + (i + 1) + ") execution time: " + currentRun + " ms"); + } + } + } + } + + /** + * Get cluster node configuration. + * + * @return Configuration of node controllers as array of Strings. + * @throws Exception + */ + private String[] getNodeList() throws Exception { + Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos(); + String[] nodeList = new String[nodeControllerInfos.size()]; + int index = 0; + for (String node : nodeControllerInfos.keySet()) { + nodeList[index++] = node; + } + return nodeList; + } + + /** + * Creates a Hyracks dataset, if not already existing with the job frame + * size, and 1 reader. Allocates a new buffer of size specified in the frame + * of Hyracks node. Creates new dataset reader with the current job ID and + * result set ID. Outputs the string in buffer for each frame. + * + * @param spec + * JobSpecification object, containing frame size. Current + * specified job. + * @param writer + * Writer for output of job. + * @throws Exception + */ + private void runJob(JobSpecification spec, PrintWriter writer) throws Exception { + int nReaders = 1; + if (hds == null) { + hds = new HyracksDataset(hcc, spec.getFrameSize(), nReaders); + } + + JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); + + ByteBuffer buffer = ByteBuffer.allocate(spec.getFrameSize()); + IHyracksDatasetReader reader = hds.createReader(jobId, resultSetId); + IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize()); + buffer.clear(); + + while (reader.read(buffer) > 0) { + buffer.clear(); + writer.print(ResultUtils.getStringFromBuffer(buffer, frameTupleAccessor)); + writer.flush(); + } + + hcc.waitForCompletion(jobId); + } + + /** + * Create a unique result set id to get the correct query back from the + * cluster. + * + * @return Result Set id generated with current system time. + */ + protected ResultSetId createResultSetId() { + return new ResultSetId(System.nanoTime()); + } + + /** + * Start local virtual cluster with cluster controller node and node + * controller nodes. IP address provided for node controller is localhost. + * Unassigned ports 39000 and 39001 are used for client and cluster port + * respectively. Creates a new Hyracks connection with the IP address and + * client ports. + * + * @throws Exception + */ + public void startLocalHyracks() throws Exception { + CCConfig ccConfig = new CCConfig(); + ccConfig.clientNetIpAddress = "127.0.0.1"; + ccConfig.clientNetPort = 39000; + ccConfig.clusterNetIpAddress = "127.0.0.1"; + ccConfig.clusterNetPort = 39001; + ccConfig.httpPort = 39002; + ccConfig.profileDumpPeriod = 10000; + cc = new ClusterControllerService(ccConfig); + cc.start(); + + ncs = new NodeControllerService[opts.localNodeControllers]; + for (int i = 0; i < ncs.length; i++) { + NCConfig ncConfig = new NCConfig(); + ncConfig.ccHost = "localhost"; + ncConfig.ccPort = 39001; + ncConfig.clusterNetIPAddress = "127.0.0.1"; + ncConfig.dataIPAddress = "127.0.0.1"; + ncConfig.resultIPAddress = "127.0.0.1"; + ncConfig.nodeId = "nc" + (i + 1); + ncConfig.ioDevices = Files.createTempDirectory(ncConfig.nodeId).toString(); + ncs[i] = new NodeControllerService(ncConfig); + ncs[i].start(); + } + + hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); + } + + /** + * Shuts down the virtual cluster, along with all nodes and node execution, + * network and queue managers. + * + * @throws Exception + */ + public void stopLocalHyracks() throws Exception { + for (int i = 0; i < ncs.length; i++) { + ncs[i].stop(); + } + cc.stop(); + } + + /** + * Reads the contents of file given in query into a String. The file is + * always closed. For XML files UTF-8 encoding is used. + * + * @param query + * The query with filename to be processed + * @return UTF-8 formatted query string + * @throws IOException + */ + private static String slurp(String query) throws IOException { + return FileUtils.readFileToString(new File(query), "UTF-8"); + } + + /** + * Save and print out the timing message. + * + * @param message + */ + private static void timingMessage(String message) { + System.out.println(message); + timingMessages.add(message); + } + + /** + * Helper class with fields and methods to handle all command line options + */ + private static class CmdLineOptions { + @Option(name = "-available-processors", usage = "Number of available processors. (default: java's available processors)") + private int availableProcessors = -1; + + @Option(name = "-client-net-ip-address", usage = "IP Address of the ClusterController.") + private String clientNetIpAddress = null; + + @Option(name = "-client-net-port", usage = "Port of the ClusterController. (default: 1098)") + private int clientNetPort = 1098; + + @Option(name = "-local-node-controllers", usage = "Number of local node controllers. (default: 1)") + private int localNodeControllers = 1; + + @Option(name = "-frame-size", usage = "Frame size in bytes. (default: 65,536)") + private int frameSize = 65536; + + @Option(name = "-join-hash-size", usage = "Join hash size in bytes. (default: 67,108,864)") + private long joinHashSize = -1; + + @Option(name = "-maximum-data-size", usage = "Maximum possible data size in bytes. (default: 150,323,855,000)") + private long maximumDataSize = -1; + + @Option(name = "-buffer-size", usage = "Disk read buffer size in bytes.") + private int bufferSize = -1; + + @Option(name = "-O", usage = "Optimization Level. (default: Full Optimization)") + private int optimizationLevel = Integer.MAX_VALUE; + + @Option(name = "-showquery", usage = "Show query string.") + private boolean showQuery; + + @Option(name = "-showast", usage = "Show abstract syntax tree.") + private boolean showAST; + + @Option(name = "-showtet", usage = "Show translated expression tree.") + private boolean showTET; + + @Option(name = "-showoet", usage = "Show optimized expression tree.") + private boolean showOET; + + @Option(name = "-showrp", usage = "Show Runtime plan.") + private boolean showRP; + + @Option(name = "-compileonly", usage = "Compile the query and stop.") + private boolean compileOnly; + + @Option(name = "-repeatexec", usage = "Number of times to repeat execution.") + private int repeatExec = 1; + + @Option(name = "-result-file", usage = "File path to save the query result.") + private String resultFile = null; + + @Option(name = "-timing", usage = "Produce timing information.") + private boolean timing; + + @Option(name = "-timing-ignore-queries", usage = "Ignore the first X number of quereies.") + private int timingIgnoreQueries = 2; + + @Option(name = "-x", usage = "Bind an external variable") + private Map<String, String> bindings = new HashMap<String, String>(); + + @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files") + private String hdfsConf = null; + + @Argument + private List<String> arguments = new ArrayList<String>(); + } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java index ed3d5ac..0695e00 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java @@ -77,7 +77,7 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { } AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; if (!functionCall.getFunctionIdentifier().equals( - BuiltinFunctions.FN_COLLECTIONWITHTAG_2.getFunctionIdentifier()) + BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier()) && !functionCall.getFunctionIdentifier().equals( BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) { return null; http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml index 2b2be80..3b9371d 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml @@ -129,7 +129,7 @@ </function> <!-- fn:collection-with-tag($arg1 as xs:string?, $arg2 as xs:string?) as node()* --> - <function name="fn:collectionwithtag"> + <function name="fn:collection-with-tag"> <param name="arg1" type="xs:string?"/> <param name="arg2" type="xs:string?"/> <return type="node()*"/> http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index 3262b81..6b57641 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -76,16 +76,17 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private HDFSFunctions hdfs; private String tag; private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n"; + private final String hdfsConf; public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, - RecordDescriptor rDesc) { + RecordDescriptor rDesc, String hdfsConf) { super(spec, 1, 1); collectionPartitions = ds.getPartitions(); dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); recordDescriptors[0] = rDesc; - this.tag = ds.getTag(); + this.hdfsConf = hdfsConf; } @Override http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java index ea4bdb1..47f9e06 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java @@ -57,11 +57,14 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String private final String[] nodeList; private final Map<String, File> sourceFileMap; private final StaticContext staticCtx; + private final String hdfsConf; - public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx) { + public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx, + String hdfsConf) { this.nodeList = nodeList; this.sourceFileMap = sourceFileMap; this.staticCtx = staticCtx; + this.hdfsConf = hdfsConf; } @Override @@ -95,7 +98,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String ds.setPartitions(collectionPartitions); } RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); - IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc); + IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf); AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount()); return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint); http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java index aea6ef7..e466b7a 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java @@ -76,6 +76,8 @@ public class XMLQueryCompiler { private final XQueryCompilationListener listener; private final ICompilerFactory cFactory; + + private final String hdfsConf; private LogicalOperatorPrettyPrintVisitor pprinter; @@ -88,16 +90,18 @@ public class XMLQueryCompiler { private int frameSize; private String[] nodeList; + public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize) { - this(listener, nodeList, frameSize, -1, -1, -1); + this(listener, nodeList, frameSize, -1, -1, -1, ""); } public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize, - int availableProcessors, long joinHashSize, long maximumDataSize) { + int availableProcessors, long joinHashSize, long maximumDataSize, String hdfsConf) { this.listener = listener == null ? NoopXQueryCompilationListener.INSTANCE : listener; this.frameSize = frameSize; this.nodeList = nodeList; + this.hdfsConf = hdfsConf; HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder( new IOptimizationContextFactory() { @Override @@ -200,7 +204,7 @@ public class XMLQueryCompiler { pprinter = new LogicalOperatorPrettyPrintVisitor(new VXQueryLogicalExpressionPrettyPrintVisitor( module.getModuleContext())); VXQueryMetadataProvider mdProvider = new VXQueryMetadataProvider(nodeList, ccb.getSourceFileMap(), - module.getModuleContext()); + module.getModuleContext(), this.hdfsConf); compiler = cFactory.createCompiler(module.getBody(), mdProvider, 0); listener.notifyTranslationResult(module); XMLQueryTypeChecker.typeCheckModule(module); http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq index 30006af..9d04916 100644 --- a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq +++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq @@ -5,9 +5,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,6 +18,6 @@ (: XQuery Aggregate Query :) (: Find the max value. :) fn:max( - for $r in collectionwithtag("hdfs://tmp/vxquery-hdfs-test/half_1/quarter_1/sensors", "data")/data + for $r in collection-with-tag("hdfs://tmp/vxquery-hdfs-test/half_1/quarter_1/sensors", "data")/data return $r/value )
