Repository: vxquery Updated Branches: refs/heads/master 33b3b79e3 -> 5d1175d2c
Move to Hyracks 0.3.3 Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/5d1175d2 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/5d1175d2 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/5d1175d2 Branch: refs/heads/master Commit: 5d1175d2cb04a54ba751295f2ac67daec38bf723 Parents: 33b3b79 Author: Till Westmann <[email protected]> Authored: Sun Jul 1 11:28:11 2018 -0700 Committer: Till Westmann <[email protected]> Committed: Mon Oct 15 19:11:31 2018 -0700 ---------------------------------------------------------------------- pom.xml | 4 +- .../VXQueryComparatorFactoryProvider.java | 10 ++- .../rules/IntroduceTwoStepAggregateRule.java | 45 ++++++++------ .../VXQueryCollectionOperatorDescriptor.java | 7 ++- .../VXQueryIndexingOperatorDescriptor.java | 6 +- .../ShowIndexesScalarEvaluatorFactory.java | 2 +- .../FnDocAvailableScalarEvaluatorFactory.java | 2 +- .../node/FnDocScalarEvaluatorFactory.java | 2 +- .../apache/vxquery/app/VXQueryApplication.java | 33 ++++++++-- .../vxquery/app/util/LocalClusterUtil.java | 65 +++++++------------- .../vxquery/rest/request/QueryRequest.java | 2 +- .../vxquery/rest/service/VXQueryService.java | 6 +- .../apache/vxquery/xtest/TestClusterUtil.java | 36 ++--------- .../java/org/apache/vxquery/xtest/XTest.java | 2 +- .../vxquery/xtest/AbstractXQueryTest.java | 4 +- 15 files changed, 111 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8b634f0..39a90e0 100644 --- a/pom.xml +++ b/pom.xml @@ -236,7 +236,7 @@ <dependency> <groupId>args4j</groupId> <artifactId>args4j</artifactId> - <version>2.0.9</version> + <version>2.33</version> </dependency> <dependency> @@ -762,7 +762,7 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <hyracks.fullstack.version>0.3.1</hyracks.fullstack.version> - <hyracks.version>0.3.0</hyracks.version> + <hyracks.version>0.3.3</hyracks.version> <lucene.version>5.5.1</lucene.version> <hadoop.version>2.7.0</hadoop.version> <apache-rat-plugin.version>0.11</apache-rat-plugin.version> http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java index b7196cf..a510e1c 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java @@ -16,12 +16,12 @@ */ package org.apache.vxquery.compiler.algebricks; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; public class VXQueryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider { @Override @@ -30,6 +30,12 @@ public class VXQueryComparatorFactoryProvider implements IBinaryComparatorFactor return new BinaryComparatorFactory(type, ascending); } + @Override + public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending, boolean ignoreCase) + throws AlgebricksException { + throw new NotImplementedException(); + } + private static class BinaryComparatorFactory implements IBinaryComparatorFactory { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java index 806b532..962b851 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java @@ -17,12 +17,10 @@ package org.apache.vxquery.compiler.rewriter.rules; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.lang3.mutable.Mutable; -import org.apache.vxquery.functions.BuiltinFunctions; -import org.apache.vxquery.functions.BuiltinOperators; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; @@ -37,6 +35,8 @@ import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.functions.BuiltinOperators; /** * The rule searches for aggregate operators with an aggregate function @@ -91,28 +91,37 @@ public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule { if (op.getOperatorTag() != LogicalOperatorTag.AGGREGATE) { return false; } - AggregateOperator aggregate = (AggregateOperator) op; - if (aggregate.getExpressions().size() == 0) { + final AggregateOperator aggregate = (AggregateOperator) op; + AggregateFunctionCallExpression aggregateFunctionCall = getAggregateFunctionCall(aggregate); + if (aggregateFunctionCall == null || aggregateFunctionCall.isTwoStep()) { return false; } - Mutable<ILogicalExpression> mutableLogicalExpression = aggregate.getExpressions().get(0); - ILogicalExpression logicalExpression = mutableLogicalExpression.getValue(); + // Replace single step aggregate function with two step function call + final IFunctionInfo functionInfo = aggregateFunctionCall.getFunctionInfo(); + final List<Mutable<ILogicalExpression>> arguments = aggregateFunctionCall.getArguments(); + AggregateFunctionCallExpression twoStepCall = + new AggregateFunctionCallExpression(functionInfo, true, arguments); + final Pair<IFunctionInfo, IFunctionInfo> functionInfoPair = + AGGREGATE_MAP.get(aggregateFunctionCall.getFunctionIdentifier()); + twoStepCall.setStepOneAggregate(functionInfoPair.first); + twoStepCall.setStepTwoAggregate(functionInfoPair.second); + aggregate.getExpressions().get(0).setValue(twoStepCall); + return true; + } + + private AggregateFunctionCallExpression getAggregateFunctionCall(AggregateOperator aggregate) { + if (aggregate.getExpressions().size() == 0) { + return null; + } + ILogicalExpression logicalExpression = aggregate.getExpressions().get(0).getValue(); if (logicalExpression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { - return false; + return null; } AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; - if (AGGREGATE_MAP.containsKey(functionCall.getFunctionIdentifier())) { - AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall; - if (aggregateFunctionCall.isTwoStep()) { - return false; - } - aggregateFunctionCall.setTwoStep(true); - aggregateFunctionCall.setStepOneAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).first); - aggregateFunctionCall.setStepTwoAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).second); - return true; + return (AggregateFunctionCallExpression) functionCall; } - return false; + return null; } @Override http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index a3756d5..5ae5ed7 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.Inet4Address; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -92,7 +93,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); valueSeq = ds.getValueSeq(); - recordDescriptors[0] = rDesc; + outRecDescs[0] = rDesc; this.tag = ds.getTag(); this.hdfsConf = hdfsConf; this.nodeControllerInfos = nodeControllerInfos; @@ -108,7 +109,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount); final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition(); final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources); - final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); + final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId(); final DynamicContext dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData(); final ArrayBackedValueStorage jsonAbvs = new ArrayBackedValueStorage(); final String collectionName = collectionPartitions[partition % collectionPartitions.length]; @@ -157,7 +158,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO try { hdfs.scheduleSplits(); ArrayList<Integer> schedule = hdfs - .getScheduleForNode(InetAddress.getLocalHost().getHostAddress()); + .getScheduleForNode(Inet4Address.getLoopbackAddress().getHostAddress()); List<InputSplit> splits = hdfs.getSplits(); List<FileSplit> fileSplits = new ArrayList<>(); for (int i : schedule) { http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java index 9353319..c26547e 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java @@ -69,7 +69,7 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe collectionPartitions = ds.getPartitions(); dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); - recordDescriptors[0] = rDesc; + outRecDescs[0] = rDesc; childSeq = ds.getChildSeq(); indexChildSeq = ds.getIndexChildSeq(); indexAttsSeq = ds.getIndexAttsSeq(); @@ -86,11 +86,11 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount); final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition(); final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources); - final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); + final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId(); final String collectionName = collectionPartitions[partition % collectionPartitions.length]; final String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil( - ctx.getIOManager().getIODevices().get(0).getMount()); + ctx.getIoManager().getIODevices().get(0).getMount()); indexCentralizerUtil.readIndexDirectory(); final IPointable result = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java index 6b18b33..6004d79 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java @@ -52,7 +52,7 @@ public class ShowIndexesScalarEvaluatorFactory extends AbstractTaggedValueArgume abvs.reset(); sb.reset(abvs); IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil( - ctx.getIOManager().getIODevices().get(0).getMount()); + ctx.getIoManager().getIODevices().get(0).getMount()); indexCentralizerUtil.readIndexDirectory(); indexCentralizerUtil.getAllCollections(sb); sb.finish(); http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java index 15fd624..6d63d5f 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java @@ -57,7 +57,7 @@ public class FnDocAvailableScalarEvaluatorFactory extends AbstractTaggedValueArg final DataInputStream di = new DataInputStream(bbis); final int partition = ctx.getTaskAttemptId().getTaskId().getPartition(); final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition); - final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); + final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId(); return new AbstractTaggedValueArgumentScalarEvaluator(args) { @Override http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java index 2fd1755..e3157af 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java @@ -57,7 +57,7 @@ public class FnDocScalarEvaluatorFactory extends AbstractTaggedValueArgumentScal final DataInputStream di = new DataInputStream(bbis); final int partition = ctx.getTaskAttemptId().getTaskId().getPartition(); final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition); - final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); + final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId(); return new AbstractTaggedValueArgumentScalarEvaluator(args) { @Override http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java b/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java index f5e0165..e2ca1b7 100644 --- a/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java +++ b/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java @@ -28,9 +28,13 @@ import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.hyracks.api.application.ICCApplicationContext; -import org.apache.hyracks.api.application.ICCApplicationEntryPoint; +import org.apache.hyracks.api.application.ICCApplication; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.ClusterControllerInfo; +import org.apache.hyracks.api.config.IConfigManager; +import org.apache.hyracks.api.job.resource.DefaultJobCapacityController; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.vxquery.exceptions.VXQueryRuntimeException; import org.apache.vxquery.rest.RestServer; import org.apache.vxquery.rest.service.VXQueryConfig; @@ -44,15 +48,20 @@ import org.kohsuke.args4j.Option; * * @author Erandi Ganepola */ -public class VXQueryApplication implements ICCApplicationEntryPoint { +public class VXQueryApplication implements ICCApplication { private static final Logger LOGGER = Logger.getLogger(VXQueryApplication.class.getName()); private VXQueryService vxQueryService; private RestServer restServer; + private ICCServiceContext ccAppCtx; + + public void init(IServiceContext serviceCtx) throws Exception { + ccAppCtx = (ICCServiceContext)serviceCtx; + } @Override - public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception { + public void start(String[] args) throws Exception { AppArgs appArgs = new AppArgs(); if (args != null) { CmdLineParser parser = new CmdLineParser(appArgs); @@ -98,6 +107,22 @@ public class VXQueryApplication implements ICCApplicationEntryPoint { } } + + @Override + public Object getApplicationContext() { + return ccAppCtx; + } + + @Override + public void registerConfig(IConfigManager configManager) { + throw new UnsupportedOperationException(); + } + + @Override + public IJobCapacityController getJobCapacityController() { + return DefaultJobCapacityController.INSTANCE; + } + /** * Loads properties from * http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java b/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java index cd149dd..998563c 100644 --- a/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java +++ b/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java @@ -24,7 +24,7 @@ import static org.apache.vxquery.rest.Constants.Properties.JOIN_HASH_SIZE; import static org.apache.vxquery.rest.Constants.Properties.MAXIMUM_DATA_SIZE; import java.io.IOException; -import java.net.InetAddress; +import java.net.Inet4Address; import java.net.UnknownHostException; import java.nio.file.Files; import java.util.Arrays; @@ -59,8 +59,6 @@ public class LocalClusterUtil { private ClusterControllerService clusterControllerService; private NodeControllerService nodeControllerSerivce; - private IHyracksClientConnection hcc; - private IHyracksDataset hds; private VXQueryService vxQueryService; public void init(VXQueryConfig config) throws Exception { @@ -77,19 +75,14 @@ public class LocalClusterUtil { clusterControllerService = new ClusterControllerService(ccConfig); clusterControllerService.start(); - hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); - hds = new HyracksDataset(hcc, config.getFrameSize(), config.getAvailableProcessors()); - // Node controller NCConfig ncConfig = createNCConfig(); nodeControllerSerivce = new NodeControllerService(ncConfig); nodeControllerSerivce.start(); - hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); - // REST controller - config.setHyracksClientIp(ccConfig.clientNetIpAddress); - config.setHyracksClientPort(ccConfig.clientNetPort); + config.setHyracksClientIp(ccConfig.getClientListenAddress()); + config.setHyracksClientPort(ccConfig.getClientListenPort()); vxQueryService = new VXQueryService(config); vxQueryService.start(); } @@ -97,35 +90,30 @@ public class LocalClusterUtil { protected CCConfig createCCConfig() throws IOException { String localAddress = getIpAddress(); CCConfig ccConfig = new CCConfig(); - ccConfig.clientNetIpAddress = localAddress; - ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT; - ccConfig.clusterNetIpAddress = localAddress; - ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT; - ccConfig.httpPort = DEFAULT_HYRACKS_CC_HTTP_PORT; - ccConfig.profileDumpPeriod = 10000; - ccConfig.appCCMainClass = VXQueryApplication.class.getName(); - ccConfig.appArgs = Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT)); - + ccConfig.setClientListenAddress(localAddress); + ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT); + ccConfig.setClusterListenAddress(localAddress); + ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); + ccConfig.setConsoleListenPort(DEFAULT_HYRACKS_CC_HTTP_PORT); + ccConfig.setProfileDumpPeriod(10000); + ccConfig.setAppClass(VXQueryApplication.class.getName()); + ccConfig.getAppArgs().addAll(Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT))); return ccConfig; } protected NCConfig createNCConfig() throws IOException { String localAddress = getIpAddress(); - NCConfig ncConfig = new NCConfig(); - ncConfig.ccHost = "localhost"; - ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT; - ncConfig.clusterNetIPAddress = localAddress; - ncConfig.dataIPAddress = localAddress; - ncConfig.resultIPAddress = localAddress; - ncConfig.nodeId = "test_node"; - ncConfig.ioDevices = Files.createTempDirectory(ncConfig.nodeId).toString(); + String nodeId = "test_node"; + NCConfig ncConfig = new NCConfig(nodeId); + ncConfig.setClusterAddress("localhost"); + ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); + ncConfig.setClusterListenAddress(localAddress); + ncConfig.setDataListenAddress(localAddress); + ncConfig.setResultListenAddress(localAddress); + ncConfig.setIODevices(new String[] { Files.createTempDirectory(nodeId).toString() }); + ncConfig.setVirtualNC(); return ncConfig; } - - public IHyracksClientConnection getHyracksClientConnection() { - return hcc; - } - public VXQueryService getVxQueryService() { return vxQueryService; } @@ -166,21 +154,10 @@ public class LocalClusterUtil { } public String getIpAddress() throws UnknownHostException { - return InetAddress.getLocalHost().getHostAddress(); + return Inet4Address.getLoopbackAddress().getHostAddress(); } public int getRestPort() { return DEFAULT_VXQUERY_REST_PORT; } - - @Deprecated - public IHyracksClientConnection getConnection() { - return hcc; - } - - @Deprecated - public IHyracksDataset getDataset() { - return hds; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java index a88ae1c..6c3a25d 100644 --- a/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java @@ -140,7 +140,7 @@ public class QueryRequest { } public String toString() { - return String.format("{ statement : %s }", statement); + return String.format("{ statement : \"%s\" }", statement); } public String getRequestId() { http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java index 1d51b6a..884abf4 100644 --- a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java @@ -186,6 +186,10 @@ public class VXQueryService { return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM) .withMessage("Hyracks connection problem: " + e.getMessage()).build()); } + if (nodeControllerInfos.isEmpty()) { + return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM) + .withMessage("No NodeControllers available").build()); + } // Adding a query compilation listener VXQueryCompilationListener listener = new VXQueryCompilationListener(response, @@ -360,7 +364,7 @@ public class VXQueryService { // This loop is required for XTests to reliably identify the error code of // SystemException. - while (reader.getResultStatus() == DatasetJobRecord.Status.RUNNING) { + while (reader.getResultStatus().getState() == DatasetJobRecord.State.RUNNING) { Thread.sleep(100); } http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java index 4d2ae8a..b2d7f04 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java @@ -17,18 +17,11 @@ package org.apache.vxquery.xtest; -import org.apache.hyracks.api.client.HyracksConnection; -import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.vxquery.app.util.LocalClusterUtil; import org.apache.vxquery.rest.service.VXQueryConfig; -import java.io.IOException; - public class TestClusterUtil { - private static HyracksConnection hcc; - private static HyracksDataset hds; - public static final LocalClusterUtil localClusterUtil = new LocalClusterUtil(); private TestClusterUtil() { @@ -44,31 +37,12 @@ public class TestClusterUtil { return vxqConfig; } - public static void startCluster(XTestOptions opts, LocalClusterUtil localClusterUtil) throws IOException { - try { - VXQueryConfig config = loadConfiguration(opts); - localClusterUtil.init(config); - hcc = (HyracksConnection) localClusterUtil.getConnection(); - hds = (HyracksDataset) localClusterUtil.getDataset(); - } catch (Exception e) { - throw new IOException(e); - } - } - - public static void stopCluster(LocalClusterUtil localClusterUtil) throws IOException { - try { - localClusterUtil.deinit(); - } catch (Exception e) { - throw new IOException(e); - } + public static void startCluster(XTestOptions opts, LocalClusterUtil localClusterUtil) throws Exception { + VXQueryConfig config = loadConfiguration(opts); + localClusterUtil.init(config); } - public static HyracksConnection getConnection() { - return hcc; + public static void stopCluster(LocalClusterUtil localClusterUtil) throws Exception { + localClusterUtil.deinit(); } - - public static HyracksDataset getDataset() { - return hds; - } - } http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java index df7a71d..df10271 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java @@ -100,7 +100,7 @@ public class XTest { } try { TestClusterUtil.stopCluster(TestClusterUtil.localClusterUtil); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } try { http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java index 8f77de4..afce2f1 100644 --- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java +++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java @@ -87,7 +87,7 @@ public abstract class AbstractXQueryTest { } @BeforeClass - public static void setup() throws IOException { + public static void setup() throws Exception { TestClusterUtil.startCluster(getDefaultTestOptions(), TestClusterUtil.localClusterUtil); setupFS(); } @@ -109,7 +109,7 @@ public abstract class AbstractXQueryTest { } @AfterClass - public static void shutdown() throws IOException { + public static void shutdown() throws Exception { removeFS(); TestClusterUtil.stopCluster(TestClusterUtil.localClusterUtil); }
