add hadoop-conf as variable, changed collectionwithtag to collection-with-tag


Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/4bd85a17
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/4bd85a17
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/4bd85a17

Branch: refs/heads/steven/hdfs
Commit: 4bd85a17ad610a8f5f1195bc7bc9e3e56d7b4acb
Parents: c85e556
Author: efikalti <[email protected]>
Authored: Fri Feb 26 14:16:48 2016 +0200
Committer: efikalti <[email protected]>
Committed: Fri Feb 26 14:16:48 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/vxquery/cli/VXQuery.java    | 739 ++++++++++---------
 .../rewriter/rules/AbstractCollectionRule.java  |   2 +-
 .../vxquery/functions/builtin-functions.xml     |   2 +-
 .../VXQueryCollectionOperatorDescriptor.java    |   5 +-
 .../metadata/VXQueryMetadataProvider.java       |   7 +-
 .../xmlquery/query/XMLQueryCompiler.java        |  10 +-
 .../XQuery/HDFS/Aggregate/maxvalueHDFS.xq       |   6 +-
 7 files changed, 398 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
----------------------------------------------------------------------
diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java 
b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
index 0e88539..db95468 100644
--- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
+++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
@@ -63,366 +63,383 @@ import 
edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 
 public class VXQuery {
-    private final CmdLineOptions opts;
-
-    private ClusterControllerService cc;
-    private NodeControllerService[] ncs;
-    private IHyracksClientConnection hcc;
-    private IHyracksDataset hds;
-
-    private ResultSetId resultSetId;
-    private static List<String> timingMessages = new ArrayList<String>();
-    private static long sumTiming;
-    private static long sumSquaredTiming;
-    private static long minTiming = Long.MAX_VALUE;
-    private static long maxTiming = Long.MIN_VALUE;
-
-    /**
-     * Constructor to use command line options passed.
-     *
-     * @param opts
-     *            Command line options object
-     */
-    public VXQuery(CmdLineOptions opts) {
-        this.opts = opts;
-    }
-
-    /**
-     * Main method to get command line options and execute query process.
-     *
-     * @param args
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        Date start = new Date();
-        final CmdLineOptions opts = new CmdLineOptions();
-        CmdLineParser parser = new CmdLineParser(opts);
-
-        // parse command line options, give error message if no arguments 
passed
-        try {
-            parser.parseArgument(args);
-        } catch (Exception e) {
-            parser.printUsage(System.err);
-            return;
-        }
-        if (opts.arguments.isEmpty()) {
-            parser.printUsage(System.err);
-            return;
-        }
-        VXQuery vxq = new VXQuery(opts);
-        vxq.execute();
-        // if -timing argument passed, show the starting and ending times
-        if (opts.timing) {
-            Date end = new Date();
-            timingMessage("Execution time: " + (end.getTime() - 
start.getTime()) + " ms");
-            if (opts.repeatExec > opts.timingIgnoreQueries) {
-                long mean = sumTiming / (opts.repeatExec - 
opts.timingIgnoreQueries);
-                double sd = Math
-                        .sqrt(sumSquaredTiming / (opts.repeatExec - new 
Integer(opts.timingIgnoreQueries).doubleValue())
-                                - mean * mean);
-                timingMessage("Average execution time: " + mean + " ms");
-                timingMessage("Standard deviation: " + String.format("%.4f", 
sd));
-                timingMessage("Coefficient of variation: " + 
String.format("%.4f", (sd / mean)));
-                timingMessage("Minimum execution time: " + minTiming + " ms");
-                timingMessage("Maximum execution time: " + maxTiming + " ms");
-            }
-            System.out.println("Timing Summary:");
-            for (String time : timingMessages) {
-                System.out.println("  " + time);
-            }
-        }
-
-    }
-
-    /**
-     * Creates a new Hyracks connection with: the client IP address and port 
provided, if IP address is provided in command line. Otherwise create a new 
virtual
-     * cluster with Hyracks nodes. Queries passed are run either way. After 
running queries, if a virtual cluster has been created, it is shut down.
-     *
-     * @throws Exception
-     */
-    private void execute() throws Exception {
-        System.setProperty("vxquery.buffer_size", 
Integer.toString(opts.bufferSize));
-
-        if (opts.clientNetIpAddress != null) {
-            hcc = new HyracksConnection(opts.clientNetIpAddress, 
opts.clientNetPort);
-            runQueries();
-        } else {
-            if (!opts.compileOnly) {
-                startLocalHyracks();
-            }
-            try {
-                runQueries();
-            } finally {
-                if (!opts.compileOnly) {
-                    stopLocalHyracks();
-                }
-            }
-        }
-    }
-
-    /**
-     * Reads the contents of the files passed in the list of arguments to a 
string. If -showquery argument is passed, output the query as string. Run the 
query
-     * for the string.
-     *
-     * @throws IOException
-     * @throws SystemException
-     * @throws Exception
-     */
-    private void runQueries() throws IOException, SystemException, Exception {
-        Date start = null;
-        Date end = null;
-        for (String query : opts.arguments) {
-            String qStr = slurp(query);
-            if (opts.showQuery) {
-                System.err.println(qStr);
-            }
-
-            VXQueryCompilationListener listener = new 
VXQueryCompilationListener(opts.showAST, opts.showTET,
-                    opts.showOET, opts.showRP);
-
-            start = opts.timing ? new Date() : null;
-            XMLQueryCompiler compiler = new XMLQueryCompiler(listener, 
getNodeList(), opts.frameSize,
-                    opts.availableProcessors, opts.joinHashSize, 
opts.maximumDataSize);
-            resultSetId = createResultSetId();
-            CompilerControlBlock ccb = new CompilerControlBlock(new 
StaticContextImpl(RootStaticContextImpl.INSTANCE),
-                    resultSetId, null);
-            compiler.compile(query, new StringReader(qStr), ccb, 
opts.optimizationLevel);
-            // if -timing argument passed, show the starting and ending times
-            if (opts.timing) {
-                end = new Date();
-                timingMessage("Compile time: " + (end.getTime() - 
start.getTime()) + " ms");
-            }
-            if (opts.compileOnly) {
-                continue;
-            }
-
-            Module module = compiler.getModule();
-            JobSpecification js = module.getHyracksJobSpecification();
-
-            DynamicContext dCtx = new 
DynamicContextImpl(module.getModuleContext());
-            js.setGlobalJobDataFactory(new 
VXQueryGlobalDataFactory(dCtx.createFactory()));
-
-            OutputStream resultStream = System.out;
-            if (opts.resultFile != null) {
-                resultStream = new FileOutputStream(new File(opts.resultFile));
-            }
-
-            PrintWriter writer = new PrintWriter(resultStream, true);
-            // Repeat execution for number of times provided in -repeatexec 
argument
-            for (int i = 0; i < opts.repeatExec; ++i) {
-                start = opts.timing ? new Date() : null;
-                runJob(js, writer);
-                // if -timing argument passed, show the starting and ending 
times
-                if (opts.timing) {
-                    end = new Date();
-                    long currentRun = end.getTime() - start.getTime();
-                    if ((i + 1) > opts.timingIgnoreQueries) {
-                        sumTiming += currentRun;
-                        sumSquaredTiming += currentRun * currentRun;
-                        if (currentRun < minTiming) {
-                            minTiming = currentRun;
-                        }
-                        if (maxTiming < currentRun) {
-                            maxTiming = currentRun;
-                        }
-                    }
-                    timingMessage("Job (" + (i + 1) + ") execution time: " + 
currentRun + " ms");
-                }
-            }
-        }
-    }
-
-    /**
-     * Get cluster node configuration.
-     *
-     * @return Configuration of node controllers as array of Strings.
-     * @throws Exception
-     */
-    private String[] getNodeList() throws Exception {
-        Map<String, NodeControllerInfo> nodeControllerInfos = 
hcc.getNodeControllerInfos();
-        String[] nodeList = new String[nodeControllerInfos.size()];
-        int index = 0;
-        for (String node : nodeControllerInfos.keySet()) {
-            nodeList[index++] = node;
-        }
-        return nodeList;
-    }
-
-    /**
-     * Creates a Hyracks dataset, if not already existing with the job frame 
size, and 1 reader. Allocates a new buffer of size specified in the frame of 
Hyracks
-     * node. Creates new dataset reader with the current job ID and result set 
ID. Outputs the string in buffer for each frame.
-     *
-     * @param spec
-     *            JobSpecification object, containing frame size. Current 
specified job.
-     * @param writer
-     *            Writer for output of job.
-     * @throws Exception
-     */
-    private void runJob(JobSpecification spec, PrintWriter writer) throws 
Exception {
-        int nReaders = 1;
-        if (hds == null) {
-            hds = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
-        }
-
-        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-
-        ByteBuffer buffer = ByteBuffer.allocate(spec.getFrameSize());
-        IHyracksDatasetReader reader = hds.createReader(jobId, resultSetId);
-        IFrameTupleAccessor frameTupleAccessor = new 
ResultFrameTupleAccessor(spec.getFrameSize());
-        buffer.clear();
-
-        while (reader.read(buffer) > 0) {
-            buffer.clear();
-            writer.print(ResultUtils.getStringFromBuffer(buffer, 
frameTupleAccessor));
-            writer.flush();
-        }
-
-        hcc.waitForCompletion(jobId);
-    }
-
-    /**
-     * Create a unique result set id to get the correct query back from the 
cluster.
-     *
-     * @return Result Set id generated with current system time.
-     */
-    protected ResultSetId createResultSetId() {
-        return new ResultSetId(System.nanoTime());
-    }
-
-    /**
-     * Start local virtual cluster with cluster controller node and node 
controller nodes. IP address provided for node controller is localhost. 
Unassigned ports
-     * 39000 and 39001 are used for client and cluster port respectively. 
Creates a new Hyracks connection with the IP address and client ports.
-     *
-     * @throws Exception
-     */
-    public void startLocalHyracks() throws Exception {
-        CCConfig ccConfig = new CCConfig();
-        ccConfig.clientNetIpAddress = "127.0.0.1";
-        ccConfig.clientNetPort = 39000;
-        ccConfig.clusterNetIpAddress = "127.0.0.1";
-        ccConfig.clusterNetPort = 39001;
-        ccConfig.httpPort = 39002;
-        ccConfig.profileDumpPeriod = 10000;
-        cc = new ClusterControllerService(ccConfig);
-        cc.start();
-
-        ncs = new NodeControllerService[opts.localNodeControllers];
-        for (int i = 0; i < ncs.length; i++) {
-            NCConfig ncConfig = new NCConfig();
-            ncConfig.ccHost = "localhost";
-            ncConfig.ccPort = 39001;
-            ncConfig.clusterNetIPAddress = "127.0.0.1";
-            ncConfig.dataIPAddress = "127.0.0.1";
-            ncConfig.resultIPAddress = "127.0.0.1";
-            ncConfig.nodeId = "nc" + (i + 1);
-            ncConfig.ioDevices = 
Files.createTempDirectory(ncConfig.nodeId).toString();
-            ncs[i] = new NodeControllerService(ncConfig);
-            ncs[i].start();
-        }
-
-        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, 
ccConfig.clientNetPort);
-    }
-
-    /**
-     * Shuts down the virtual cluster, along with all nodes and node 
execution, network and queue managers.
-     *
-     * @throws Exception
-     */
-    public void stopLocalHyracks() throws Exception {
-        for (int i = 0; i < ncs.length; i++) {
-            ncs[i].stop();
-        }
-        cc.stop();
-    }
-
-    /**
-     * Reads the contents of file given in query into a String. The file is 
always closed. For XML files UTF-8 encoding is used.
-     *
-     * @param query
-     *            The query with filename to be processed
-     * @return UTF-8 formatted query string
-     * @throws IOException
-     */
-    private static String slurp(String query) throws IOException {
-        return FileUtils.readFileToString(new File(query), "UTF-8");
-    }
-
-    /**
-     * Save and print out the timing message.
-     *
-     * @param message
-     */
-    private static void timingMessage(String message) {
-        System.out.println(message);
-        timingMessages.add(message);
-    }
-
-    /**
-     * Helper class with fields and methods to handle all command line options
-     */
-    private static class CmdLineOptions {
-        @Option(name = "-available-processors", usage = "Number of available 
processors. (default: java's available processors)")
-        private int availableProcessors = -1;
-
-        @Option(name = "-client-net-ip-address", usage = "IP Address of the 
ClusterController.")
-        private String clientNetIpAddress = null;
-
-        @Option(name = "-client-net-port", usage = "Port of the 
ClusterController. (default: 1098)")
-        private int clientNetPort = 1098;
-
-        @Option(name = "-local-node-controllers", usage = "Number of local 
node controllers. (default: 1)")
-        private int localNodeControllers = 1;
-
-        @Option(name = "-frame-size", usage = "Frame size in bytes. (default: 
65,536)")
-        private int frameSize = 65536;
-
-        @Option(name = "-join-hash-size", usage = "Join hash size in bytes. 
(default: 67,108,864)")
-        private long joinHashSize = -1;
-
-        @Option(name = "-maximum-data-size", usage = "Maximum possible data 
size in bytes. (default: 150,323,855,000)")
-        private long maximumDataSize = -1;
-
-        @Option(name = "-buffer-size", usage = "Disk read buffer size in 
bytes.")
-        private int bufferSize = -1;
-
-        @Option(name = "-O", usage = "Optimization Level. (default: Full 
Optimization)")
-        private int optimizationLevel = Integer.MAX_VALUE;
-
-        @Option(name = "-showquery", usage = "Show query string.")
-        private boolean showQuery;
-
-        @Option(name = "-showast", usage = "Show abstract syntax tree.")
-        private boolean showAST;
-
-        @Option(name = "-showtet", usage = "Show translated expression tree.")
-        private boolean showTET;
-
-        @Option(name = "-showoet", usage = "Show optimized expression tree.")
-        private boolean showOET;
-
-        @Option(name = "-showrp", usage = "Show Runtime plan.")
-        private boolean showRP;
-
-        @Option(name = "-compileonly", usage = "Compile the query and stop.")
-        private boolean compileOnly;
-
-        @Option(name = "-repeatexec", usage = "Number of times to repeat 
execution.")
-        private int repeatExec = 1;
-
-        @Option(name = "-result-file", usage = "File path to save the query 
result.")
-        private String resultFile = null;
-
-        @Option(name = "-timing", usage = "Produce timing information.")
-        private boolean timing;
-
-        @Option(name = "-timing-ignore-queries", usage = "Ignore the first X 
number of quereies.")
-        private int timingIgnoreQueries = 2;
-
-        @Option(name = "-x", usage = "Bind an external variable")
-        private Map<String, String> bindings = new HashMap<String, String>();
-
-        @Argument
-        private List<String> arguments = new ArrayList<String>();
-    }
+       private final CmdLineOptions opts;
+
+       private ClusterControllerService cc;
+       private NodeControllerService[] ncs;
+       private IHyracksClientConnection hcc;
+       private IHyracksDataset hds;
+
+       private ResultSetId resultSetId;
+       private static List<String> timingMessages = new ArrayList<String>();
+       private static long sumTiming;
+       private static long sumSquaredTiming;
+       private static long minTiming = Long.MAX_VALUE;
+       private static long maxTiming = Long.MIN_VALUE;
+
+       /**
+        * Constructor to use command line options passed.
+        *
+        * @param opts
+        *            Command line options object
+        */
+       public VXQuery(CmdLineOptions opts) {
+               this.opts = opts;
+       }
+
+       /**
+        * Main method to get command line options and execute query process.
+        *
+        * @param args
+        * @throws Exception
+        */
+       public static void main(String[] args) throws Exception {
+               Date start = new Date();
+               final CmdLineOptions opts = new CmdLineOptions();
+               CmdLineParser parser = new CmdLineParser(opts);
+
+               // parse command line options, give error message if no 
arguments passed
+               try {
+                       parser.parseArgument(args);
+               } catch (Exception e) {
+                       parser.printUsage(System.err);
+                       return;
+               }
+               if (opts.arguments.isEmpty()) {
+                       parser.printUsage(System.err);
+                       return;
+               }
+               VXQuery vxq = new VXQuery(opts);
+               vxq.execute();
+               // if -timing argument passed, show the starting and ending 
times
+               if (opts.timing) {
+                       Date end = new Date();
+                       timingMessage("Execution time: " + (end.getTime() - 
start.getTime()) + " ms");
+                       if (opts.repeatExec > opts.timingIgnoreQueries) {
+                               long mean = sumTiming / (opts.repeatExec - 
opts.timingIgnoreQueries);
+                               double sd = Math
+                                               .sqrt(sumSquaredTiming / 
(opts.repeatExec - new Integer(opts.timingIgnoreQueries).doubleValue())
+                                                               - mean * mean);
+                               timingMessage("Average execution time: " + mean 
+ " ms");
+                               timingMessage("Standard deviation: " + 
String.format("%.4f", sd));
+                               timingMessage("Coefficient of variation: " + 
String.format("%.4f", (sd / mean)));
+                               timingMessage("Minimum execution time: " + 
minTiming + " ms");
+                               timingMessage("Maximum execution time: " + 
maxTiming + " ms");
+                       }
+                       System.out.println("Timing Summary:");
+                       for (String time : timingMessages) {
+                               System.out.println("  " + time);
+                       }
+               }
+
+       }
+
+       /**
+        * Creates a new Hyracks connection with: the client IP address and port
+        * provided, if IP address is provided in command line. Otherwise 
create a
+        * new virtual cluster with Hyracks nodes. Queries passed are run either
+        * way. After running queries, if a virtual cluster has been created, 
it is
+        * shut down.
+        *
+        * @throws Exception
+        */
+       private void execute() throws Exception {
+               System.setProperty("vxquery.buffer_size", 
Integer.toString(opts.bufferSize));
+
+               if (opts.clientNetIpAddress != null) {
+                       hcc = new HyracksConnection(opts.clientNetIpAddress, 
opts.clientNetPort);
+                       runQueries();
+               } else {
+                       if (!opts.compileOnly) {
+                               startLocalHyracks();
+                       }
+                       try {
+                               runQueries();
+                       } finally {
+                               if (!opts.compileOnly) {
+                                       stopLocalHyracks();
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Reads the contents of the files passed in the list of arguments to a
+        * string. If -showquery argument is passed, output the query as 
string. Run
+        * the query for the string.
+        *
+        * @throws IOException
+        * @throws SystemException
+        * @throws Exception
+        */
+       private void runQueries() throws IOException, SystemException, 
Exception {
+               Date start = null;
+               Date end = null;
+               for (String query : opts.arguments) {
+                       String qStr = slurp(query);
+                       if (opts.showQuery) {
+                               System.err.println(qStr);
+                       }
+
+                       VXQueryCompilationListener listener = new 
VXQueryCompilationListener(opts.showAST, opts.showTET,
+                                       opts.showOET, opts.showRP);
+                       start = opts.timing ? new Date() : null;
+                       XMLQueryCompiler compiler = new 
XMLQueryCompiler(listener, getNodeList(), opts.frameSize,
+                                       opts.availableProcessors, 
opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf);
+                       resultSetId = createResultSetId();
+                       CompilerControlBlock ccb = new CompilerControlBlock(new 
StaticContextImpl(RootStaticContextImpl.INSTANCE),
+                                       resultSetId, null);
+                       compiler.compile(query, new StringReader(qStr), ccb, 
opts.optimizationLevel);
+                       // if -timing argument passed, show the starting and 
ending times
+                       if (opts.timing) {
+                               end = new Date();
+                               timingMessage("Compile time: " + (end.getTime() 
- start.getTime()) + " ms");
+                       }
+                       if (opts.compileOnly) {
+                               continue;
+                       }
+
+                       Module module = compiler.getModule();
+                       JobSpecification js = 
module.getHyracksJobSpecification();
+                       js.setProperty("hdfsConf", opts.hdfsConf);
+
+                       DynamicContext dCtx = new 
DynamicContextImpl(module.getModuleContext());
+                       js.setGlobalJobDataFactory(new 
VXQueryGlobalDataFactory(dCtx.createFactory()));
+                       OutputStream resultStream = System.out;
+                       if (opts.resultFile != null) {
+                               resultStream = new FileOutputStream(new 
File(opts.resultFile));
+                       }
+
+                       PrintWriter writer = new PrintWriter(resultStream, 
true);
+                       // Repeat execution for number of times provided in 
-repeatexec
+                       // argument
+                       for (int i = 0; i < opts.repeatExec; ++i) {
+                               start = opts.timing ? new Date() : null;
+                               runJob(js, writer);
+                               // if -timing argument passed, show the 
starting and ending
+                               // times
+                               if (opts.timing) {
+                                       end = new Date();
+                                       long currentRun = end.getTime() - 
start.getTime();
+                                       if ((i + 1) > opts.timingIgnoreQueries) 
{
+                                               sumTiming += currentRun;
+                                               sumSquaredTiming += currentRun 
* currentRun;
+                                               if (currentRun < minTiming) {
+                                                       minTiming = currentRun;
+                                               }
+                                               if (maxTiming < currentRun) {
+                                                       maxTiming = currentRun;
+                                               }
+                                       }
+                                       timingMessage("Job (" + (i + 1) + ") 
execution time: " + currentRun + " ms");
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Get cluster node configuration.
+        *
+        * @return Configuration of node controllers as array of Strings.
+        * @throws Exception
+        */
+       private String[] getNodeList() throws Exception {
+               Map<String, NodeControllerInfo> nodeControllerInfos = 
hcc.getNodeControllerInfos();
+               String[] nodeList = new String[nodeControllerInfos.size()];
+               int index = 0;
+               for (String node : nodeControllerInfos.keySet()) {
+                       nodeList[index++] = node;
+               }
+               return nodeList;
+       }
+
+       /**
+        * Creates a Hyracks dataset, if not already existing with the job frame
+        * size, and 1 reader. Allocates a new buffer of size specified in the 
frame
+        * of Hyracks node. Creates new dataset reader with the current job ID 
and
+        * result set ID. Outputs the string in buffer for each frame.
+        *
+        * @param spec
+        *            JobSpecification object, containing frame size. Current
+        *            specified job.
+        * @param writer
+        *            Writer for output of job.
+        * @throws Exception
+        */
+       private void runJob(JobSpecification spec, PrintWriter writer) throws 
Exception {
+               int nReaders = 1;
+               if (hds == null) {
+                       hds = new HyracksDataset(hcc, spec.getFrameSize(), 
nReaders);
+               }
+
+               JobId jobId = hcc.startJob(spec, 
EnumSet.of(JobFlag.PROFILE_RUNTIME));
+
+               ByteBuffer buffer = ByteBuffer.allocate(spec.getFrameSize());
+               IHyracksDatasetReader reader = hds.createReader(jobId, 
resultSetId);
+               IFrameTupleAccessor frameTupleAccessor = new 
ResultFrameTupleAccessor(spec.getFrameSize());
+               buffer.clear();
+
+               while (reader.read(buffer) > 0) {
+                       buffer.clear();
+                       writer.print(ResultUtils.getStringFromBuffer(buffer, 
frameTupleAccessor));
+                       writer.flush();
+               }
+
+               hcc.waitForCompletion(jobId);
+       }
+
+       /**
+        * Create a unique result set id to get the correct query back from the
+        * cluster.
+        *
+        * @return Result Set id generated with current system time.
+        */
+       protected ResultSetId createResultSetId() {
+               return new ResultSetId(System.nanoTime());
+       }
+
+       /**
+        * Start local virtual cluster with cluster controller node and node
+        * controller nodes. IP address provided for node controller is 
localhost.
+        * Unassigned ports 39000 and 39001 are used for client and cluster port
+        * respectively. Creates a new Hyracks connection with the IP address 
and
+        * client ports.
+        *
+        * @throws Exception
+        */
+       public void startLocalHyracks() throws Exception {
+               CCConfig ccConfig = new CCConfig();
+               ccConfig.clientNetIpAddress = "127.0.0.1";
+               ccConfig.clientNetPort = 39000;
+               ccConfig.clusterNetIpAddress = "127.0.0.1";
+               ccConfig.clusterNetPort = 39001;
+               ccConfig.httpPort = 39002;
+               ccConfig.profileDumpPeriod = 10000;
+               cc = new ClusterControllerService(ccConfig);
+               cc.start();
+
+               ncs = new NodeControllerService[opts.localNodeControllers];
+               for (int i = 0; i < ncs.length; i++) {
+                       NCConfig ncConfig = new NCConfig();
+                       ncConfig.ccHost = "localhost";
+                       ncConfig.ccPort = 39001;
+                       ncConfig.clusterNetIPAddress = "127.0.0.1";
+                       ncConfig.dataIPAddress = "127.0.0.1";
+                       ncConfig.resultIPAddress = "127.0.0.1";
+                       ncConfig.nodeId = "nc" + (i + 1);
+                       ncConfig.ioDevices = 
Files.createTempDirectory(ncConfig.nodeId).toString();
+                       ncs[i] = new NodeControllerService(ncConfig);
+                       ncs[i].start();
+               }
+
+               hcc = new HyracksConnection(ccConfig.clientNetIpAddress, 
ccConfig.clientNetPort);
+       }
+
+       /**
+        * Shuts down the virtual cluster, along with all nodes and node 
execution,
+        * network and queue managers.
+        *
+        * @throws Exception
+        */
+       public void stopLocalHyracks() throws Exception {
+               for (int i = 0; i < ncs.length; i++) {
+                       ncs[i].stop();
+               }
+               cc.stop();
+       }
+
+       /**
+        * Reads the contents of file given in query into a String. The file is
+        * always closed. For XML files UTF-8 encoding is used.
+        *
+        * @param query
+        *            The query with filename to be processed
+        * @return UTF-8 formatted query string
+        * @throws IOException
+        */
+       private static String slurp(String query) throws IOException {
+               return FileUtils.readFileToString(new File(query), "UTF-8");
+       }
+
+       /**
+        * Save and print out the timing message.
+        *
+        * @param message
+        */
+       private static void timingMessage(String message) {
+               System.out.println(message);
+               timingMessages.add(message);
+       }
+
+       /**
+        * Helper class with fields and methods to handle all command line 
options
+        */
+       private static class CmdLineOptions {
+               @Option(name = "-available-processors", usage = "Number of 
available processors. (default: java's available processors)")
+               private int availableProcessors = -1;
+
+               @Option(name = "-client-net-ip-address", usage = "IP Address of 
the ClusterController.")
+               private String clientNetIpAddress = null;
+
+               @Option(name = "-client-net-port", usage = "Port of the 
ClusterController. (default: 1098)")
+               private int clientNetPort = 1098;
+
+               @Option(name = "-local-node-controllers", usage = "Number of 
local node controllers. (default: 1)")
+               private int localNodeControllers = 1;
+
+               @Option(name = "-frame-size", usage = "Frame size in bytes. 
(default: 65,536)")
+               private int frameSize = 65536;
+
+               @Option(name = "-join-hash-size", usage = "Join hash size in 
bytes. (default: 67,108,864)")
+               private long joinHashSize = -1;
+
+               @Option(name = "-maximum-data-size", usage = "Maximum possible 
data size in bytes. (default: 150,323,855,000)")
+               private long maximumDataSize = -1;
+
+               @Option(name = "-buffer-size", usage = "Disk read buffer size 
in bytes.")
+               private int bufferSize = -1;
+
+               @Option(name = "-O", usage = "Optimization Level. (default: 
Full Optimization)")
+               private int optimizationLevel = Integer.MAX_VALUE;
+
+               @Option(name = "-showquery", usage = "Show query string.")
+               private boolean showQuery;
+
+               @Option(name = "-showast", usage = "Show abstract syntax tree.")
+               private boolean showAST;
+
+               @Option(name = "-showtet", usage = "Show translated expression 
tree.")
+               private boolean showTET;
+
+               @Option(name = "-showoet", usage = "Show optimized expression 
tree.")
+               private boolean showOET;
+
+               @Option(name = "-showrp", usage = "Show Runtime plan.")
+               private boolean showRP;
+
+               @Option(name = "-compileonly", usage = "Compile the query and 
stop.")
+               private boolean compileOnly;
+
+               @Option(name = "-repeatexec", usage = "Number of times to 
repeat execution.")
+               private int repeatExec = 1;
+
+               @Option(name = "-result-file", usage = "File path to save the 
query result.")
+               private String resultFile = null;
+
+               @Option(name = "-timing", usage = "Produce timing information.")
+               private boolean timing;
+
+               @Option(name = "-timing-ignore-queries", usage = "Ignore the 
first X number of quereies.")
+               private int timingIgnoreQueries = 2;
+
+               @Option(name = "-x", usage = "Bind an external variable")
+               private Map<String, String> bindings = new HashMap<String, 
String>();
+
+               @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop 
configuration files")
+               private String hdfsConf = null;
+
+               @Argument
+               private List<String> arguments = new ArrayList<String>();
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
index ed3d5ac..0695e00 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
@@ -77,7 +77,7 @@ public abstract class AbstractCollectionRule implements 
IAlgebraicRewriteRule {
         }
         AbstractFunctionCallExpression functionCall = 
(AbstractFunctionCallExpression) logicalExpression;
         if (!functionCall.getFunctionIdentifier().equals(
-                
BuiltinFunctions.FN_COLLECTIONWITHTAG_2.getFunctionIdentifier())
+                
BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier())
                 && !functionCall.getFunctionIdentifier().equals(
                         
BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) {
             return null;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index 2b2be80..3b9371d 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -129,7 +129,7 @@
     </function>
     
     <!-- fn:collection-with-tag($arg1  as xs:string?, $arg2 as xs:string?) as  
node()* -->
-    <function name="fn:collectionwithtag">
+    <function name="fn:collection-with-tag">
         <param name="arg1" type="xs:string?"/>
         <param name="arg2" type="xs:string?"/>
         <return type="node()*"/>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index 3262b81..6b57641 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -76,16 +76,17 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
     private HDFSFunctions hdfs;
     private String tag;
     private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" 
standalone=\"yes\"?>\n";
+    private final String hdfsConf;
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry 
spec, VXQueryCollectionDataSource ds,
-            RecordDescriptor rDesc) {
+            RecordDescriptor rDesc, String hdfsConf) {
         super(spec, 1, 1);
         collectionPartitions = ds.getPartitions();
         dataSourceId = (short) ds.getDataSourceId();
         totalDataSources = (short) ds.getTotalDataSources();
         childSeq = ds.getChildSeq();
         recordDescriptors[0] = rDesc;
-        this.tag = ds.getTag();
+        this.hdfsConf = hdfsConf;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index ea4bdb1..47f9e06 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -57,11 +57,14 @@ public class VXQueryMetadataProvider implements 
IMetadataProvider<String, String
     private final String[] nodeList;
     private final Map<String, File> sourceFileMap;
     private final StaticContext staticCtx;
+    private final String hdfsConf;
 
-    public VXQueryMetadataProvider(String[] nodeList, Map<String, File> 
sourceFileMap, StaticContext staticCtx) {
+    public VXQueryMetadataProvider(String[] nodeList, Map<String, File> 
sourceFileMap, StaticContext staticCtx,
+               String hdfsConf) {
         this.nodeList = nodeList;
         this.sourceFileMap = sourceFileMap;
         this.staticCtx = staticCtx;
+        this.hdfsConf = hdfsConf;
     }
 
     @Override
@@ -95,7 +98,7 @@ public class VXQueryMetadataProvider implements 
IMetadataProvider<String, String
             ds.setPartitions(collectionPartitions);
         }
         RecordDescriptor rDesc = new RecordDescriptor(new 
ISerializerDeserializer[opSchema.getSize()]);
-        IOperatorDescriptor scanner = new 
VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc);
+        IOperatorDescriptor scanner = new 
VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf);
 
         AlgebricksPartitionConstraint constraint = 
getClusterLocations(nodeList, ds.getPartitionCount());
         return new Pair<IOperatorDescriptor, 
AlgebricksPartitionConstraint>(scanner, constraint);

http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
index aea6ef7..e466b7a 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
@@ -76,6 +76,8 @@ public class XMLQueryCompiler {
     private final XQueryCompilationListener listener;
 
     private final ICompilerFactory cFactory;
+    
+    private final String hdfsConf;
 
     private LogicalOperatorPrettyPrintVisitor pprinter;
 
@@ -88,16 +90,18 @@ public class XMLQueryCompiler {
     private int frameSize;
 
     private String[] nodeList;
+    
 
     public XMLQueryCompiler(XQueryCompilationListener listener, String[] 
nodeList, int frameSize) {
-        this(listener, nodeList, frameSize, -1, -1, -1);
+        this(listener, nodeList, frameSize, -1, -1, -1, "");
     }
 
     public XMLQueryCompiler(XQueryCompilationListener listener, String[] 
nodeList, int frameSize,
-            int availableProcessors, long joinHashSize, long maximumDataSize) {
+            int availableProcessors, long joinHashSize, long maximumDataSize, 
String hdfsConf) {
         this.listener = listener == null ? 
NoopXQueryCompilationListener.INSTANCE : listener;
         this.frameSize = frameSize;
         this.nodeList = nodeList;
+        this.hdfsConf = hdfsConf;
         HeuristicCompilerFactoryBuilder builder = new 
HeuristicCompilerFactoryBuilder(
                 new IOptimizationContextFactory() {
                     @Override
@@ -200,7 +204,7 @@ public class XMLQueryCompiler {
         pprinter = new LogicalOperatorPrettyPrintVisitor(new 
VXQueryLogicalExpressionPrettyPrintVisitor(
                 module.getModuleContext()));
         VXQueryMetadataProvider mdProvider = new 
VXQueryMetadataProvider(nodeList, ccb.getSourceFileMap(),
-                module.getModuleContext());
+                module.getModuleContext(), this.hdfsConf);
         compiler = cFactory.createCompiler(module.getBody(), mdProvider, 0);
         listener.notifyTranslationResult(module);
         XMLQueryTypeChecker.typeCheckModule(module);

http://git-wip-us.apache.org/repos/asf/vxquery/blob/4bd85a17/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
----------------------------------------------------------------------
diff --git 
a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
 
b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
index 30006af..9d04916 100644
--- 
a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
+++ 
b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
@@ -5,9 +5,9 @@
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
-   
+
      http://www.apache.org/licenses/LICENSE-2.0
-   
+
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,6 @@
 (: XQuery Aggregate Query :)
 (: Find the max value.                                            :)
 fn:max(
-    for $r in 
collectionwithtag("hdfs://tmp/vxquery-hdfs-test/half_1/quarter_1/sensors", 
"data")/data
+    for $r in 
collection-with-tag("hdfs://tmp/vxquery-hdfs-test/half_1/quarter_1/sensors", 
"data")/data
     return $r/value
 )

Reply via email to