Fixed EVERYTHJING
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/66ff50ae Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/66ff50ae Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/66ff50ae Branch: refs/heads/steven/hdfs Commit: 66ff50ae90bbafc5a47cf610ec5bef2e8e57b19a Parents: f90d587 Author: Steven Jacobs <[email protected]> Authored: Tue May 17 16:08:32 2016 -0700 Committer: Steven Jacobs <[email protected]> Committed: Tue May 17 16:08:32 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/vxquery/cli/VXQuery.java | 26 ++---- vxquery-core/pom.xml | 1 - .../rewriter/rules/AbstractCollectionRule.java | 3 +- .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 98 +++++--------------- .../VXQueryCollectionOperatorDescriptor.java | 45 +++------ .../metadata/VXQueryMetadataProvider.java | 17 ++-- .../runtime/functions/util/FunctionHelper.java | 17 ++-- .../org/apache/vxquery/xmlparser/XMLParser.java | 10 +- .../xmlquery/query/XMLQueryCompiler.java | 92 +++++++++++------- .../xmlquery/query/SimpleXQueryTest.java | 19 ++-- .../src/main/resources/conf/cluster.properties | 5 +- .../java/org/apache/vxquery/xtest/MiniDFS.java | 21 +---- .../org/apache/vxquery/xtest/TestRunner.java | 23 +++-- .../org/apache/vxquery/xtest/XTestOptions.java | 3 + .../vxquery/xtest/AbstractXQueryTest.java | 1 + 15 files changed, 159 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java ---------------------------------------------------------------------- diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java index a6ef702..17287c6 100644 --- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java +++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java @@ -182,7 +182,12 @@ public class VXQuery { opts.showOET, opts.showRP); start = opts.timing ? new Date() : null; - XMLQueryCompiler compiler = new XMLQueryCompiler(listener, getNodeList(), opts.frameSize, + + Map<String, NodeControllerInfo> nodeControllerInfos = null; + if (hcc != null) { + nodeControllerInfos = hcc.getNodeControllerInfos(); + } + XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize, opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf); resultSetId = createResultSetId(); CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE), @@ -234,25 +239,6 @@ public class VXQuery { } /** - * Get cluster node configuration. - * - * @return Configuration of node controllers as array of Strings. - * @throws Exception - */ - private String[] getNodeList() throws Exception { - if (hcc != null) { - Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos(); - String[] nodeList = new String[nodeControllerInfos.size()]; - int index = 0; - for (String node : nodeControllerInfos.keySet()) { - nodeList[index++] = node; - } - return nodeList; - } - return new String[0]; - } - - /** * Creates a Hyracks dataset, if not already existing with the job frame size, and 1 reader. Allocates a new buffer of size specified in the frame of Hyracks * node. Creates new dataset reader with the current job ID and result set ID. Outputs the string in buffer for each frame. * http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/pom.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml index 38b8030..d244818 100644 --- a/vxquery-core/pom.xml +++ b/vxquery-core/pom.xml @@ -216,7 +216,6 @@ <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-hdfs-core</artifactId> - <type>jar</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java index dd9965b..74220da 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.commons.lang3.mutable.Mutable; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -59,7 +58,7 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { * Logical operator * @return collection name */ - protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) throws AlgebricksException { + protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) { AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) { http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java index 4a562cf..dcbfe94 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java @@ -16,8 +16,6 @@ */ package org.apache.vxquery.hdfs2; -import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; @@ -26,7 +24,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -43,24 +41,19 @@ import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.hdfs.ContextFactory; +import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory; import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; import org.xml.sax.InputSource; import org.xml.sax.SAXException; -import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; -import edu.uci.ics.hyracks.hdfs.ContextFactory; -import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory; - -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.vxquery.metadata.VXQueryCollectionOperatorDescriptor; - public class HDFSFunctions { private Configuration conf; @@ -70,19 +63,24 @@ public class HDFSFunctions { private InputFormat inputFormat; private List<InputSplit> splits; private ArrayList<ArrayList<String>> nodes; - private File nodeXMLfile; private HashMap<Integer, String> schedule; private final String TEMP = "java.io.tmpdir"; private final String dfs_path = "vxquery_splits_schedule.txt"; private final String filepath = System.getProperty(TEMP) + "splits_schedule.txt"; protected static final Logger LOGGER = Logger.getLogger(HDFSFunctions.class.getName()); + private final Map<String, NodeControllerInfo> nodeControllerInfos; /** * Create the configuration and add the paths for core-site and hdfs-site as resources. * Initialize an instance of HDFS FileSystem for this configuration. + * + * @param nodeControllerInfos + * @param hdfsConf */ - public HDFSFunctions() { + public HDFSFunctions(Map<String, NodeControllerInfo> nodeControllerInfos, String hdfsConf) { this.conf = new Configuration(); + this.nodeControllerInfos = nodeControllerInfos; + this.conf_path = hdfsConf; } /** @@ -164,43 +162,9 @@ public class HDFSFunctions { */ private boolean locateConf() { if (this.conf_path == null) { - // load properties file - Properties prop = new Properties(); - String propFilePath = "../vxquery-server/src/main/resources/conf/cluster.properties"; - nodeXMLfile = new File("../vxquery-server/src/main/resources/conf/cluster.xml"); - if(!nodeXMLfile.exists()) { - nodeXMLfile = new File("vxquery-server/src/main/resources/conf/cluster.xml"); - if(!nodeXMLfile.exists()) { - nodeXMLfile = new File("vxquery-server/src/main/resources/conf/local.xml"); - } - if(!nodeXMLfile.exists()) { - nodeXMLfile = new File("../vxquery-server/src/main/resources/conf/local.xml"); - } - } - try { - prop.load(new FileInputStream(propFilePath)); - } catch (FileNotFoundException e) { - propFilePath = "vxquery-server/src/main/resources/conf/cluster.properties"; - try { - prop.load(new FileInputStream(propFilePath)); - } catch (IOException e1) { - if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.severe(e1.getMessage()); - } - } - } catch (IOException e) { - if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.severe(e.getMessage()); - } - return false; - } - // get the property value for HDFS_CONF - this.conf_path = prop.getProperty("HDFS_CONF"); - if (this.conf_path == null) { - this.conf_path = System.getenv("HADOOP_CONF_DIR"); - return this.conf_path != null; - } - return this.conf_path != null; + //As a last resort, try getting the configuration from the system environment + //Some systems won't have this set. + this.conf_path = System.getenv("HADOOP_CONF_DIR"); } return this.conf_path != null; } @@ -339,7 +303,7 @@ public class HDFSFunctions { /** * Read the hostname and the ip address of every node from the xml cluster configuration file. - * Save the information inside an ArrayList. + * Save the information inside nodes. * * @throws ParserConfigurationException * @throws IOException @@ -349,26 +313,14 @@ public class HDFSFunctions { DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder dBuilder; dBuilder = dbFactory.newDocumentBuilder(); - Document doc = dBuilder.parse(nodeXMLfile); - doc.getDocumentElement().normalize(); - nodes = new ArrayList<ArrayList<String>>(); - NodeList nList = doc.getElementsByTagName("node"); - - for (int temp = 0; temp < nList.getLength(); temp++) { - - Node nNode = nList.item(temp); - - if (nNode.getNodeType() == Node.ELEMENT_NODE) { - - Element eElement = (Element) nNode; - ArrayList<String> info = new ArrayList<String>(); - info.add(eElement.getElementsByTagName("id").item(0).getTextContent()); - info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent()); - nodes.add(info); - } + for (NodeControllerInfo ncInfo : nodeControllerInfos.values()) { + //Will this include the master node? Is that bad? + ArrayList<String> info = new ArrayList<String>(); + info.add(ncInfo.getNodeId()); + info.add(ncInfo.getNetworkAddress().getAddress()); + nodes.add(info); } - } /** http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index e2f705e..b8dca63 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -34,7 +35,6 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; -<<<<<<< HEAD import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -======= +import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameFieldAppender; import org.apache.hyracks.api.comm.VSizeFrame; @@ -60,7 +60,8 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; ->>>>>>> master +import org.apache.hyracks.hdfs.ContextFactory; +import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory; import org.apache.vxquery.context.DynamicContext; import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; @@ -68,23 +69,6 @@ import org.apache.vxquery.xmlparser.TreeNodeIdProvider; import org.apache.vxquery.xmlparser.XMLParser; import org.xml.sax.SAXException; -<<<<<<< HEAD -import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; -import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable; -import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; -import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; -import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry; -import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils; -import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; -import edu.uci.ics.hyracks.hdfs.ContextFactory; -import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory; - -======= ->>>>>>> master public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; private short dataSourceId; @@ -96,16 +80,19 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private String tag; private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n"; private final String hdfsConf; + private final Map<String, NodeControllerInfo> nodeControllerInfos; public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, - RecordDescriptor rDesc, String hdfsConf) { + RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { super(spec, 1, 1); collectionPartitions = ds.getPartitions(); dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); recordDescriptors[0] = rDesc; + this.tag = ds.getTag(); this.hdfsConf = hdfsConf; + this.nodeControllerInfos = nodeControllerInfos; } @Override @@ -130,7 +117,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void open() throws HyracksDataException { appender.reset(frame, true); writer.open(); - hdfs = new HDFSFunctions(); + hdfs = new HDFSFunctions(nodeControllerInfos, hdfsConf); } @Override @@ -151,16 +138,12 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); } - parser.parseElements(xmlDocument, writer, fta, tupleIndex); + parser.parseElements(xmlDocument, writer, tupleIndex); } } -<<<<<<< HEAD } else { throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + collectionDirectory.getAbsolutePath() + ") passed to collection."); -======= - parser.parseElements(xmlDocument, writer, tupleIndex); ->>>>>>> master } } } else { @@ -178,8 +161,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO InputFormat inputFormat = hdfs.getinputFormat(); try { hdfs.scheduleSplits(); - ArrayList<Integer> schedule = hdfs.getScheduleForNode(InetAddress.getLocalHost() - .getHostAddress()); + ArrayList<Integer> schedule = hdfs + .getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); List<InputSplit> splits = hdfs.getSplits(); List<FileSplit> fileSplits = new ArrayList<FileSplit>(); for (int i : schedule) { @@ -252,8 +235,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO xmlDocument = it.next().getPath(); if (fs.isFile(xmlDocument)) { if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " - + xmlDocument.getName()); + LOGGER.fine( + "Starting to read XML document: " + xmlDocument.getName()); } //create an input stream to the file currently reading and send it to parser InputStream in = fs.open(xmlDocument).getWrappedStream(); http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java index 5310dfd..820c365 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.vxquery.context.StaticContext; - import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -45,6 +43,7 @@ import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; +import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -52,19 +51,22 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; +import org.apache.vxquery.context.StaticContext; public class VXQueryMetadataProvider implements IMetadataProvider<String, String> { private final String[] nodeList; private final Map<String, File> sourceFileMap; private final StaticContext staticCtx; private final String hdfsConf; + private final Map<String, NodeControllerInfo> nodeControllerInfos; public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx, - String hdfsConf) { + String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { this.nodeList = nodeList; this.sourceFileMap = sourceFileMap; this.staticCtx = staticCtx; this.hdfsConf = hdfsConf; + this.nodeControllerInfos = nodeControllerInfos; } @Override @@ -85,7 +87,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) - throws AlgebricksException { + throws AlgebricksException { VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource; if (sourceFileMap != null) { final int len = ds.getPartitions().length; @@ -98,7 +100,8 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String ds.setPartitions(collectionPartitions); } RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); - IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf); + IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf, + this.nodeControllerInfos); AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount()); return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint); @@ -132,7 +135,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String @Override public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) - throws AlgebricksException { + throws AlgebricksException { throw new UnsupportedOperationException(); } @@ -158,7 +161,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) - throws AlgebricksException { + throws AlgebricksException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java index 854f6c6..1df6439 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java @@ -27,6 +27,13 @@ import java.util.Arrays; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.DoublePointable; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; import org.apache.vxquery.context.DynamicContext; import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.datamodel.accessors.TypedPointables; @@ -47,14 +54,6 @@ import org.apache.vxquery.types.BuiltinTypeConstants; import org.apache.vxquery.types.BuiltinTypeRegistry; import org.apache.vxquery.xmlparser.XMLParser; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.DoublePointable; -import org.apache.hyracks.data.std.primitive.LongPointable; -import org.apache.hyracks.data.std.primitive.UTF8StringPointable; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; - public class FunctionHelper { public static void arithmeticOperation(AbstractArithmeticOperation aOp, DynamicContext dCtx, @@ -1209,7 +1208,7 @@ public class FunctionHelper { //else check in HDFS file system else { fName = fName.replaceAll("hdfs:/", ""); - HDFSFunctions hdfs = new HDFSFunctions(); + HDFSFunctions hdfs = new HDFSFunctions(null, null); FileSystem fs = hdfs.getFileSystem(); if (fs != null) { Path xmlDocument = new Path(fName); http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java index b6eb621..a62a26c 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.comm.IFrameFieldAppender; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.vxquery.context.StaticContext; import org.apache.vxquery.exceptions.VXQueryFileNotFoundException; import org.apache.vxquery.exceptions.VXQueryParseException; @@ -99,8 +100,7 @@ public class XMLParser { } } - public void parseElements(File file, IFrameWriter writer, int tupleIndex) - throws HyracksDataException { + public void parseElements(File file, IFrameWriter writer, int tupleIndex) throws HyracksDataException { try { Reader input; if (bufferSize > 0) { @@ -127,7 +127,6 @@ public class XMLParser { } } -<<<<<<< HEAD public void parseHDFSElements(InputStream inputStream, IFrameWriter writer, FrameTupleAccessor fta, int tupleIndex) throws IOException { try { @@ -138,7 +137,7 @@ public class XMLParser { input = new InputStreamReader(inputStream); } in.setCharacterStream(input); - handler.setupElementWriter(writer, fta, tupleIndex); + handler.setupElementWriter(writer, tupleIndex); parser.parse(in); input.close(); } catch (IOException e) { @@ -172,7 +171,4 @@ public class XMLParser { e.printStackTrace(); } } - -======= ->>>>>>> master } http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java index f2e4dd3..8a044ea 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java @@ -17,28 +17,7 @@ package org.apache.vxquery.xmlquery.query; import java.io.Reader; import java.util.ArrayList; import java.util.List; - -import org.apache.vxquery.compiler.CompilerControlBlock; -import org.apache.vxquery.compiler.algebricks.VXQueryBinaryBooleanInspectorFactory; -import org.apache.vxquery.compiler.algebricks.VXQueryBinaryIntegerInspectorFactory; -import org.apache.vxquery.compiler.algebricks.VXQueryComparatorFactoryProvider; -import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; -import org.apache.vxquery.compiler.algebricks.VXQueryExpressionRuntimeProvider; -import org.apache.vxquery.compiler.algebricks.VXQueryNullWriterFactory; -import org.apache.vxquery.compiler.algebricks.VXQueryPrinterFactoryProvider; -import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor; -import org.apache.vxquery.compiler.rewriter.RewriteRuleset; -import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext; -import org.apache.vxquery.exceptions.ErrorCode; -import org.apache.vxquery.exceptions.SystemException; -import org.apache.vxquery.metadata.VXQueryMetadataProvider; -import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFactoryProvider; -import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFamilyProvider; -import org.apache.vxquery.types.BuiltinTypeRegistry; -import org.apache.vxquery.types.Quantifier; -import org.apache.vxquery.types.SequenceType; -import org.apache.vxquery.xmlquery.ast.ModuleNode; -import org.apache.vxquery.xmlquery.translator.XMLQueryTranslator; +import java.util.Map; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; @@ -67,16 +46,38 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFact import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.vxquery.compiler.CompilerControlBlock; +import org.apache.vxquery.compiler.algebricks.VXQueryBinaryBooleanInspectorFactory; +import org.apache.vxquery.compiler.algebricks.VXQueryBinaryIntegerInspectorFactory; +import org.apache.vxquery.compiler.algebricks.VXQueryComparatorFactoryProvider; +import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; +import org.apache.vxquery.compiler.algebricks.VXQueryExpressionRuntimeProvider; +import org.apache.vxquery.compiler.algebricks.VXQueryNullWriterFactory; +import org.apache.vxquery.compiler.algebricks.VXQueryPrinterFactoryProvider; +import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor; +import org.apache.vxquery.compiler.rewriter.RewriteRuleset; +import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext; +import org.apache.vxquery.exceptions.ErrorCode; +import org.apache.vxquery.exceptions.SystemException; +import org.apache.vxquery.metadata.VXQueryMetadataProvider; +import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFactoryProvider; +import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFamilyProvider; +import org.apache.vxquery.types.BuiltinTypeRegistry; +import org.apache.vxquery.types.Quantifier; +import org.apache.vxquery.types.SequenceType; +import org.apache.vxquery.xmlquery.ast.ModuleNode; +import org.apache.vxquery.xmlquery.translator.XMLQueryTranslator; public class XMLQueryCompiler { private final XQueryCompilationListener listener; private final ICompilerFactory cFactory; - + private final String hdfsConf; private LogicalOperatorPrettyPrintVisitor pprinter; @@ -89,18 +90,26 @@ public class XMLQueryCompiler { private int frameSize; + private Map<String, NodeControllerInfo> nodeControllerInfos; + private String[] nodeList; - - public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize) { - this(listener, nodeList, frameSize, -1, -1, -1, ""); + public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos, + int frameSize, String hdfsConf) { + this(listener, nodeControllerInfos, frameSize, -1, -1, -1, hdfsConf); + } + + public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos, + int frameSize) { + this(listener, nodeControllerInfos, frameSize, -1, -1, -1, ""); } - public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize, - int availableProcessors, long joinHashSize, long maximumDataSize, String hdfsConf) { + public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos, + int frameSize, int availableProcessors, long joinHashSize, long maximumDataSize, String hdfsConf) { this.listener = listener == null ? NoopXQueryCompilationListener.INSTANCE : listener; this.frameSize = frameSize; - this.nodeList = nodeList; + this.nodeControllerInfos = nodeControllerInfos; + setNodeList(); this.hdfsConf = hdfsConf; HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder( new IOptimizationContextFactory() { @@ -120,12 +129,12 @@ public class XMLQueryCompiler { builder.getPhysicalOptimizationConfig().setMaxFramesHybridHash((int) (joinHashSize / this.frameSize)); } if (maximumDataSize > 0) { - builder.getPhysicalOptimizationConfig().setMaxFramesLeftInputHybridHash( - (int) (maximumDataSize / this.frameSize)); + builder.getPhysicalOptimizationConfig() + .setMaxFramesLeftInputHybridHash((int) (maximumDataSize / this.frameSize)); } - builder.getPhysicalOptimizationConfig().setMaxFramesLeftInputHybridHash( - (int) (60L * 1024 * 1048576 / this.frameSize)); + builder.getPhysicalOptimizationConfig() + .setMaxFramesLeftInputHybridHash((int) (60L * 1024 * 1048576 / this.frameSize)); builder.setLogicalRewrites(buildDefaultLogicalRewrites()); builder.setPhysicalRewrites(buildDefaultPhysicalRewrites()); @@ -196,15 +205,26 @@ public class XMLQueryCompiler { cFactory = builder.create(); } + /** + * Set Configuration of node controllers as array of Strings. + */ + private void setNodeList() { + nodeList = new String[nodeControllerInfos.size()]; + int index = 0; + for (String node : nodeControllerInfos.keySet()) { + nodeList[index++] = node; + } + } + public void compile(String name, Reader query, CompilerControlBlock ccb, int optimizationLevel) throws SystemException { moduleNode = XMLQueryParser.parse(name, query); listener.notifyParseResult(moduleNode); module = new XMLQueryTranslator(ccb).translateModule(moduleNode); - pprinter = new LogicalOperatorPrettyPrintVisitor(new VXQueryLogicalExpressionPrettyPrintVisitor( - module.getModuleContext())); + pprinter = new LogicalOperatorPrettyPrintVisitor( + new VXQueryLogicalExpressionPrettyPrintVisitor(module.getModuleContext())); VXQueryMetadataProvider mdProvider = new VXQueryMetadataProvider(nodeList, ccb.getSourceFileMap(), - module.getModuleContext(), this.hdfsConf); + module.getModuleContext(), this.hdfsConf, nodeControllerInfos); compiler = cFactory.createCompiler(module.getBody(), mdProvider, 0); listener.notifyTranslationResult(module); XMLQueryTypeChecker.typeCheckModule(module); http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java b/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java index 165b330..13ca921 100644 --- a/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java +++ b/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java @@ -20,16 +20,19 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; import java.util.zip.GZIPInputStream; -import junit.framework.Assert; - +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.vxquery.compiler.CompilerControlBlock; import org.apache.vxquery.context.RootStaticContextImpl; import org.apache.vxquery.context.StaticContextImpl; import org.junit.Test; -import org.apache.hyracks.api.dataset.ResultSetId; +import junit.framework.Assert; public class SimpleXQueryTest { @Test @@ -96,8 +99,8 @@ public class SimpleXQueryTest { private static String gunzip(String dir, String filename) { try { - GZIPInputStream in = new GZIPInputStream(new BufferedInputStream(new FileInputStream(new File(dir - + filename + ".gz")))); + GZIPInputStream in = new GZIPInputStream( + new BufferedInputStream(new FileInputStream(new File(dir + filename + ".gz")))); File temp = File.createTempFile("vxquery", filename); temp.deleteOnExit(); FileOutputStream out = new FileOutputStream(temp); @@ -133,7 +136,11 @@ public class SimpleXQueryTest { } private static void runTestInternal(String testName, String query) throws Exception { - XMLQueryCompiler compiler = new XMLQueryCompiler(null, new String[] { "nc1" }, 65536); + + Map<String, NodeControllerInfo> nodeControllerInfos = new HashMap<String, NodeControllerInfo>(); + nodeControllerInfos.put("nc1", new NodeControllerInfo("nc1", null, new NetworkAddress("127.0.0.1", 0), null)); + + XMLQueryCompiler compiler = new XMLQueryCompiler(null, nodeControllerInfos, 65536); CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE), new ResultSetId(System.nanoTime()), null); compiler.compile(testName, new StringReader(query), ccb, Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-server/src/main/resources/conf/cluster.properties ---------------------------------------------------------------------- diff --git a/vxquery-server/src/main/resources/conf/cluster.properties b/vxquery-server/src/main/resources/conf/cluster.properties index 5b8fe9f..6339fd9 100644 --- a/vxquery-server/src/main/resources/conf/cluster.properties +++ b/vxquery-server/src/main/resources/conf/cluster.properties @@ -51,7 +51,4 @@ CCJAVA_OPTS="-server -Xmx500M -Djava.util.logging.config.file=./vxquery-benchmar NCJAVA_OPTS="-server -Xmx7G -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/testing_logging.properties" # debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties" # Yourkit option: -agentpath:/tools/yjp-2014-build-14114/bin/linux-x86-64/libyjpagent.so=port=20001" -# Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling - -#HDFS configuration directory -HDFS_CONF=/Users/stevenjacobs/asterix/vxquery/vxquery-xtest/src/test/resources/hadoop/conf \ No newline at end of file +# Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling \ No newline at end of file http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java index c8c5fa8..0720baa 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java @@ -20,9 +20,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.mapred.JobConf; @@ -40,15 +38,6 @@ public class MiniDFS { FileSystem lfs = FileSystem.getLocal(new Configuration()); JobConf conf = new JobConf(); String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf"; - Path hdfs_conf = new Path(PATH_TO_HADOOP_CONF); - if (!lfs.exists(hdfs_conf)) { - PATH_TO_HADOOP_CONF = "vxquery-xtest/src/test/resources/hadoop/conf"; - hdfs_conf = new Path(PATH_TO_HADOOP_CONF); - if (!lfs.exists(hdfs_conf)) { - PATH_TO_HADOOP_CONF = "../vxquery-xtest/src/test/resources/hadoop/conf"; - hdfs_conf = new Path(PATH_TO_HADOOP_CONF); - } - } conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml")); conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml")); conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml")); @@ -71,15 +60,7 @@ public class MiniDFS { FileSystem dfs = FileSystem.get(conf); String DATA_PATH = "src/test/resources/TestSources/ghcnd"; Path src = new Path(DATA_PATH); - if (!lfs.exists(src)) { - DATA_PATH = "vxquery-xtest/src/test/resources/TestSources/ghcnd"; - src = new Path(DATA_PATH); - if (!lfs.exists(src)) { - DATA_PATH = "../vxquery-xtest/src/test/resources/TestSources/ghcnd"; - src = new Path(DATA_PATH); - } - } - dfs.mkdirs(new Path("/tmp")); + dfs.mkdirs(new Path("/tmp")); Path dest = new Path("/tmp/vxquery-hdfs-test"); dfs.copyFromLocalFile(src, dest); if (dfs.exists(dest)) { http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java index 18db5c1..5b6ddff 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java @@ -18,13 +18,16 @@ import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.InetAddress; import java.util.EnumSet; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.VSizeFrame; @@ -63,6 +66,7 @@ public class TestRunner { private NodeControllerService nc1; private IHyracksClientConnection hcc; private IHyracksDataset hds; + private final String publicAddress = InetAddress.getLocalHost().getHostAddress(); public TestRunner(XTestOptions opts) throws Exception { this.opts = opts; @@ -70,9 +74,9 @@ public class TestRunner { public void open() throws Exception { CCConfig ccConfig = new CCConfig(); - ccConfig.clientNetIpAddress = "127.0.0.1"; + ccConfig.clientNetIpAddress = publicAddress; ccConfig.clientNetPort = 39000; - ccConfig.clusterNetIpAddress = "127.0.0.1"; + ccConfig.clusterNetIpAddress = publicAddress; ccConfig.clusterNetPort = 39001; ccConfig.profileDumpPeriod = 10000; File outDir = new File("target/ClusterController"); @@ -87,9 +91,9 @@ public class TestRunner { NCConfig ncConfig1 = new NCConfig(); ncConfig1.ccHost = "localhost"; ncConfig1.ccPort = 39001; - ncConfig1.clusterNetIPAddress = "127.0.0.1"; - ncConfig1.dataIPAddress = "127.0.0.1"; - ncConfig1.resultIPAddress = "127.0.0.1"; + ncConfig1.clusterNetIPAddress = publicAddress; + ncConfig1.dataIPAddress = publicAddress; + ncConfig1.resultIPAddress = publicAddress; ncConfig1.nodeId = "nc1"; nc1 = new NodeControllerService(ncConfig1); nc1.start(); @@ -115,7 +119,14 @@ public class TestRunner { VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET, opts.showOET, opts.showRP); - XMLQueryCompiler compiler = new XMLQueryCompiler(listener, new String[] { "nc1" }, opts.frameSize); + + Map<String, NodeControllerInfo> nodeControllerInfos = null; + if (hcc != null) { + nodeControllerInfos = hcc.getNodeControllerInfos(); + } + + XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize, + opts.hdfsConf); Reader in = new InputStreamReader(new FileInputStream(testCase.getXQueryFile()), "UTF-8"); CompilerControlBlock ccb = new CompilerControlBlock( new StaticContextImpl(RootStaticContextImpl.INSTANCE), http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java index 854cbf8..496b74a 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java @@ -76,4 +76,7 @@ public class XTestOptions { @Option(name = "-showresult", usage = "Show query result.") boolean showResult; + + @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files") + String hdfsConf; } http://git-wip-us.apache.org/repos/asf/vxquery/blob/66ff50ae/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java index 12a91a9..5411215 100644 --- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java +++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java @@ -37,6 +37,7 @@ public abstract class AbstractXQueryTest { opts.threads = 1; opts.showQuery = true; opts.showResult = true; + opts.hdfsConf = "src/test/resources/hadoop/conf"; return opts; }
