Implements concurrent query management support. The following changes are included: -- factor out JobManager, NodeManager, and ResourceManager from ClusterControllerService; -- let each application plugin its own IJobCapacityController implementation; -- let each job specify its required cluster capacity; -- add a required cluster capacity estimation visitor for optimized query plans; -- add admission control and queuing for queries, but always executes DDLs and DMLs immediately; -- add tests for JobManger, NodeManager, ClusterCapacity, ClusterCapacityVisitor, and IJobCapacityController; -- enlarge the -Xmx setting for ManagixSqlppExecutionTest.
Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1424 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e0c232d2 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e0c232d2 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e0c232d2 Branch: refs/heads/master Commit: e0c232d2764307e25db767f397a2fd7e577bf338 Parents: 6224966 Author: Yingyi Bu <yin...@couchbase.com> Authored: Tue Jan 24 09:02:45 2017 -0800 Committer: Till Westmann <ti...@apache.org> Committed: Wed Jan 25 06:45:40 2017 -0800 ---------------------------------------------------------------------- .../apache/asterix/api/common/APIFramework.java | 119 +-- .../app/resource/RequiredCapacityVisitor.java | 364 +++++++++ .../bootstrap/CCApplicationEntryPoint.java | 14 +- .../bootstrap/ClusterLifecycleListener.java | 5 +- .../bootstrap/GlobalRecoveryManager.java | 3 +- .../bootstrap/NCApplicationEntryPoint.java | 15 + .../asterix/messaging/CCMessageBroker.java | 5 +- .../org/apache/asterix/util/ResourceUtils.java | 70 ++ .../asterix/api/common/APIFrameworkTest.java | 84 ++- .../resource/RequiredCapacityVisitorTest.java | 174 +++++ .../queries_sqlpp/tpcds/q09/q09.3.query.sqlpp | 3 + .../common/api/IClusterEventsSubscriber.java | 3 +- .../integrationts/asterix-configuration.xml | 2 +- .../cluster/RemoveNodeWorkResponse.java | 3 +- asterixdb/asterix-runtime/pom.xml | 11 + .../job/resource/JobCapacityController.java | 76 ++ .../asterix/runtime/util/RuntimeUtils.java | 4 +- .../job/resource/JobCapacityControllerTest.java | 98 +++ .../server/test/SampleLocalClusterIT.java | 4 +- .../common/exceptions/AlgebricksException.java | 20 + ...ialFirstRuleCheckFixpointRuleController.java | 2 +- hyracks-fullstack/hyracks/hyracks-api/pom.xml | 5 + .../api/application/ICCApplicationContext.java | 11 +- .../application/ICCApplicationEntryPoint.java | 8 +- .../application/IClusterLifecycleListener.java | 3 +- .../application/INCApplicationEntryPoint.java | 10 +- .../hyracks/api/client/NodeControllerInfo.java | 10 +- ...ionActivityClusterGraphGeneratorFactory.java | 2 - .../hyracks/api/exceptions/ErrorCode.java | 19 +- .../api/exceptions/HyracksDataException.java | 80 +- .../api/exceptions/HyracksException.java | 104 ++- .../hyracks/api/job/JobSpecification.java | 32 +- .../org/apache/hyracks/api/job/JobStatus.java | 4 +- .../api/job/resource/ClusterCapacity.java | 125 ++++ .../resource/DefaultJobCapacityController.java | 40 + .../api/job/resource/IClusterCapacity.java | 76 ++ .../job/resource/IJobCapacityController.java | 60 ++ .../job/resource/IReadOnlyClusterCapacity.java | 64 ++ .../hyracks/api/job/resource/NodeCapacity.java | 58 ++ .../src/main/resources/errormsg/en.properties | 19 +- .../api/job/resource/ClusterCapacityTest.java | 85 +++ .../hyracks-control/hyracks-control-cc/pom.xml | 11 + .../hyracks/control/cc/ClientInterfaceIPCI.java | 13 +- .../control/cc/ClusterControllerIPCI.java | 6 +- .../control/cc/ClusterControllerService.java | 84 +-- .../hyracks/control/cc/NodeControllerState.java | 22 +- .../cc/adminconsole/pages/IndexPage.java | 4 +- .../cc/adminconsole/pages/JobDetailsPage.java | 2 +- .../cc/application/CCApplicationContext.java | 3 +- .../control/cc/cluster/INodeManager.java | 114 +++ .../hyracks/control/cc/cluster/NodeManager.java | 186 +++++ .../cc/executor/ActivityClusterPlanner.java | 448 +++++++++++ .../cc/executor/ActivityPartitionDetails.java | 53 ++ .../control/cc/executor/JobExecutor.java | 723 ++++++++++++++++++ .../cc/executor/PartitionConstraintSolver.java | 129 ++++ .../cc/executor/RankedRunnableTaskCluster.java | 64 ++ .../control/cc/executor/Runnability.java | 105 +++ .../hyracks/control/cc/job/ActivityPlan.java | 2 +- .../hyracks/control/cc/job/IJobManager.java | 112 +++ .../hyracks/control/cc/job/JobManager.java | 306 ++++++++ .../apache/hyracks/control/cc/job/JobRun.java | 52 +- .../cc/partitions/PartitionMatchMaker.java | 4 +- .../control/cc/partitions/PartitionUtils.java | 8 +- .../cc/scheduler/ActivityClusterPlanner.java | 448 ----------- .../cc/scheduler/ActivityPartitionDetails.java | 53 -- .../control/cc/scheduler/FIFOJobQueue.java | 103 +++ .../hyracks/control/cc/scheduler/IJobQueue.java | 55 ++ .../control/cc/scheduler/IResourceManager.java | 54 ++ .../control/cc/scheduler/JobScheduler.java | 745 ------------------- .../cc/scheduler/PartitionConstraintSolver.java | 129 ---- .../cc/scheduler/RankedRunnableTaskCluster.java | 51 -- .../control/cc/scheduler/ResourceManager.java | 52 ++ .../control/cc/scheduler/Runnability.java | 105 --- .../control/cc/web/JobsRESTAPIFunction.java | 4 +- .../control/cc/web/NodesRESTAPIFunction.java | 11 +- .../hyracks/control/cc/web/WebServer.java | 3 - .../control/cc/work/AbstractHeartbeatWork.java | 7 +- .../cc/work/AbstractTaskLifecycleWork.java | 4 +- .../control/cc/work/CliDeployBinaryWork.java | 15 +- .../control/cc/work/CliUnDeployBinaryWork.java | 14 +- .../control/cc/work/ClusterShutdownWork.java | 12 +- .../control/cc/work/GatherStateDumpsWork.java | 14 +- .../work/GetActivityClusterGraphJSONWork.java | 18 +- .../cc/work/GetIpAddressNodeNameMapWork.java | 9 +- .../hyracks/control/cc/work/GetJobInfoWork.java | 13 +- .../control/cc/work/GetJobRunJSONWork.java | 23 +- .../control/cc/work/GetJobStatusWork.java | 14 +- .../cc/work/GetJobSummariesJSONWork.java | 19 +- .../cc/work/GetNodeControllersInfoWork.java | 20 +- .../control/cc/work/GetNodeDetailsJSONWork.java | 25 +- .../cc/work/GetNodeSummariesJSONWork.java | 16 +- .../control/cc/work/GetThreadDumpWork.java | 5 +- .../hyracks/control/cc/work/JobCleanupWork.java | 101 +-- .../hyracks/control/cc/work/JobStartWork.java | 19 +- .../cc/work/JobletCleanupNotificationWork.java | 32 +- .../control/cc/work/NotifyDeployBinaryWork.java | 2 +- .../control/cc/work/NotifyShutdownWork.java | 2 +- .../control/cc/work/RegisterNodeWork.java | 31 +- .../work/RegisterPartitionAvailibilityWork.java | 5 +- .../cc/work/RegisterPartitionRequestWork.java | 5 +- .../control/cc/work/RemoveDeadNodesWork.java | 57 +- .../control/cc/work/ReportProfilesWork.java | 14 +- .../control/cc/work/TaskCompleteWork.java | 6 +- .../control/cc/work/TaskFailureWork.java | 6 +- .../control/cc/work/UnregisterNodeWork.java | 14 +- .../cc/work/WaitForJobCompletionWork.java | 42 +- .../control/cc/cluster/NodeManagerTest.java | 165 ++++ .../hyracks/control/cc/job/JobManagerTest.java | 238 ++++++ .../control/common/base/INodeController.java | 3 +- .../control/common/controllers/CCConfig.java | 9 + .../common/controllers/NodeRegistration.java | 10 +- .../common/deployment/DeploymentRun.java | 3 +- .../control/common/ipc/CCNCFunctions.java | 8 +- .../common/ipc/NodeControllerRemoteProxy.java | 4 +- .../control/common/shutdown/ShutdownRun.java | 3 +- .../control/nc/NodeControllerService.java | 9 +- .../apache/hyracks/control/nc/io/IOManager.java | 2 +- .../hyracks/control/nc/work/StartTasksWork.java | 8 +- .../btree/helper/NCApplicationEntryPoint.java | 7 + .../search/AbstractTOccurrenceSearcher.java | 4 +- 120 files changed, 5008 insertions(+), 2187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 90a8599..c4535cf 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -61,9 +61,10 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.util.AppContextInfo; import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; -import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.util.ResourceUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -89,6 +90,7 @@ import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConf import org.apache.hyracks.api.client.IClusterInfoCollector; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -238,8 +240,9 @@ public class APIFramework { int parallelism = getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY), compilerProperties.getParallelism()); - builder.setClusterLocations(parallelism == CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE - ? metadataProvider.getClusterLocations() : getComputationLocations(clusterInfoCollector, parallelism)); + AlgebricksAbsolutePartitionConstraint computationLocations = chooseLocations(clusterInfoCollector, parallelism, + metadataProvider.getClusterLocations()); + builder.setClusterLocations(computationLocations); ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter()); if (conf.isOptimize()) { @@ -314,6 +317,14 @@ public class APIFramework { metadataProvider.isWriteTransaction()); JobSpecification spec = compiler.createJob(AppContextInfo.INSTANCE, jobEventListenerFactory); + // When the top-level statement is a query, the statement parameter is null. + if (statement == null) { + // Sets a required capacity, only for read-only queries. + // DDLs and DMLs are considered not that frequent. + spec.setRequiredClusterCapacity(ResourceUtils.getRequiredCompacity(plan, computationLocations, + sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize)); + } + if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) { printPlanPrefix(conf, "Hyracks job"); if (rwQ != null) { @@ -364,54 +375,78 @@ public class APIFramework { } } - // Computes the location constraints based on user-configured parallelism parameter. - // Note that the parallelism parameter is only a hint -- it will not be respected if it is too small or too large. - private AlgebricksAbsolutePartitionConstraint getComputationLocations(IClusterInfoCollector clusterInfoCollector, - int parallelismHint) throws AlgebricksException { + // Chooses the location constraints, i.e., whether to use storage parallelism or use a user-sepcified number + // of cores. + private AlgebricksAbsolutePartitionConstraint chooseLocations(IClusterInfoCollector clusterInfoCollector, + int parallelismHint, AlgebricksAbsolutePartitionConstraint storageLocations) throws AlgebricksException { try { Map<String, NodeControllerInfo> ncMap = clusterInfoCollector.getNodeControllerInfos(); - // Unifies the handling of non-positive parallelism. - int parallelism = parallelismHint <= 0 ? -2 * ncMap.size() : parallelismHint; - - // Calculates per node parallelism, with load balance, i.e., randomly selecting nodes with larger - // parallelism. - int numNodes = ncMap.size(); - int numNodesWithOneMorePartition = parallelism % numNodes; - int perNodeParallelismMin = parallelism / numNodes; - int perNodeParallelismMax = parallelism / numNodes + 1; - List<String> allNodes = new ArrayList<>(); - Set<String> selectedNodesWithOneMorePartition = new HashSet<>(); - for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) { - allNodes.add(entry.getKey()); - } - Random random = new Random(); - for (int index = numNodesWithOneMorePartition; index >= 1; --index) { - int pick = random.nextInt(index); - selectedNodesWithOneMorePartition.add(allNodes.get(pick)); - Collections.swap(allNodes, pick, index - 1); - } + // Gets total number of cores in the cluster. + int totalNumCores = getTotalNumCores(ncMap); - // Generates cluster locations, which has duplicates for a node if it contains more than one partitions. - List<String> locations = new ArrayList<>(); - for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) { - String nodeId = entry.getKey(); - int numCores = entry.getValue().getNumCores(); - int availableCores = numCores > 1 ? numCores - 1 : numCores; // Reserves one core for heartbeat. - int nodeParallelism = selectedNodesWithOneMorePartition.contains(nodeId) ? perNodeParallelismMax - : perNodeParallelismMin; - int coresToUse = nodeParallelism >= 0 && nodeParallelism < availableCores ? nodeParallelism - : availableCores; - for (int count = 0; count < coresToUse; ++count) { - locations.add(nodeId); - } + // If storage parallelism is not larger than the total number of cores, we use the storage parallelism. + // Otherwise, we will use all available cores. + if (parallelismHint == CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE + && storageLocations.getLocations().length <= totalNumCores) { + return storageLocations; } - return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); - } catch (Exception e) { + return getComputationLocations(ncMap, parallelismHint); + } catch (HyracksException e) { throw new AlgebricksException(e); } } + // Computes the location constraints based on user-configured parallelism parameter. + // Note that the parallelism parameter is only a hint -- it will not be respected if it is too small or too large. + private AlgebricksAbsolutePartitionConstraint getComputationLocations(Map<String, NodeControllerInfo> ncMap, + int parallelismHint) { + // Unifies the handling of non-positive parallelism. + int parallelism = parallelismHint <= 0 ? -2 * ncMap.size() : parallelismHint; + + // Calculates per node parallelism, with load balance, i.e., randomly selecting nodes with larger + // parallelism. + int numNodes = ncMap.size(); + int numNodesWithOneMorePartition = parallelism % numNodes; + int perNodeParallelismMin = parallelism / numNodes; + int perNodeParallelismMax = parallelism / numNodes + 1; + List<String> allNodes = new ArrayList<>(); + Set<String> selectedNodesWithOneMorePartition = new HashSet<>(); + for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) { + allNodes.add(entry.getKey()); + } + Random random = new Random(); + for (int index = numNodesWithOneMorePartition; index >= 1; --index) { + int pick = random.nextInt(index); + selectedNodesWithOneMorePartition.add(allNodes.get(pick)); + Collections.swap(allNodes, pick, index - 1); + } + + // Generates cluster locations, which has duplicates for a node if it contains more than one partitions. + List<String> locations = new ArrayList<>(); + for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) { + String nodeId = entry.getKey(); + int availableCores = entry.getValue().getNumAvailableCores(); + int nodeParallelism = selectedNodesWithOneMorePartition.contains(nodeId) ? perNodeParallelismMax + : perNodeParallelismMin; + int coresToUse = nodeParallelism >= 0 && nodeParallelism < availableCores ? nodeParallelism + : availableCores; + for (int count = 0; count < coresToUse; ++count) { + locations.add(nodeId); + } + } + return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); + } + + // Gets the total number of available cores in the cluster. + private int getTotalNumCores(Map<String, NodeControllerInfo> ncMap) { + int sum = 0; + for (Map.Entry<String, NodeControllerInfo> entry : ncMap.entrySet()) { + sum += entry.getValue().getNumAvailableCores(); + } + return sum; + } + // Gets the frame limit. private int getFrameLimit(String parameter, long memBudgetInConfiguration, int frameSize) { IPropertyInterpreter<Long> longBytePropertyInterpreter = PropertyInterpreters.getLongBytePropertyInterpreter(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java new file mode 100644 index 0000000..3a6bfee --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.resource; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; +import org.apache.hyracks.api.job.resource.IClusterCapacity; + +// The current implementation aggregates the memory requirement for each operator. +// TODO(buyingyi): consider stages for calculating the memory requirement. +public class RequiredCapacityVisitor implements ILogicalOperatorVisitor<Void, Void> { + + private static final long MAX_BUFFER_PER_CONNECTION = 1L; + + private final long numComputationPartitions; + private final long groupByMemorySize; + private final long joinMemorySize; + private final long sortMemorySize; + private final long frameSize; + private final IClusterCapacity clusterCapacity; + private final Set<ILogicalOperator> visitedOperators = new HashSet<>(); + private long stageMemorySoFar = 0L; + + public RequiredCapacityVisitor(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit, + int joinFrameLimit, int frameSize, IClusterCapacity clusterCapacity) { + this.numComputationPartitions = numComputationPartitions; + this.frameSize = frameSize; + this.groupByMemorySize = groupFrameLimit * (long) frameSize; + this.joinMemorySize = joinFrameLimit * (long) frameSize; + this.sortMemorySize = sortFrameLimit * (long) frameSize; + this.clusterCapacity = clusterCapacity; + this.clusterCapacity.setAggregatedCores(1); // At least one core is needed. + } + + @Override + public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException { + calculateMemoryUsageForBlockingOperators(op, groupByMemorySize); + return null; + } + + @Override + public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException { + calculateMemoryUsageForBlockingOperators(op, joinMemorySize); + return null; + } + + @Override + public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException { + calculateMemoryUsageForBlockingOperators(op, joinMemorySize); + return null; + } + + @Override + public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException { + calculateMemoryUsageForBlockingOperators(op, sortMemorySize); + return null; + } + + @Override + public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + // Makes sure that the downstream of a replicate operator is only visited once. + if (!visitedOperators.contains(op)) { + visitedOperators.add(op); + visitInternal(op, true); + } + return null; + } + + @Override + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + // Makes sure that the downstream of a split operator is only visited once. + if (!visitedOperators.contains(op)) { + visitedOperators.add(op); + visitInternal(op, true); + } + return null; + } + + @Override + public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException { + calculateMemoryUsageForExchange(op); + return null; + } + + @Override + public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) + throws AlgebricksException { + visitInternal(op, true); + return null; + } + + @Override + public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException { + visitInternal(op, true); + return null; + } + + // Calculates the memory usage for exchange operators. + private void calculateMemoryUsageForExchange(ExchangeOperator op) throws AlgebricksException { + visitInternal(op, false); + IPhysicalOperator physicalOperator = op.getPhysicalOperator(); + PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag(); + if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE + || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) { + addOutputBuffer(op); + return; + } + stageMemorySoFar += 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions + * frameSize; + clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar); + } + + // Calculates the cluster-wide memory usage for blocking activities like group-by, sort, and join. + private void calculateMemoryUsageForBlockingOperators(ILogicalOperator op, long memSize) + throws AlgebricksException { + visitInternal(op, false); + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + stageMemorySoFar += memSize * numComputationPartitions; + } else { + stageMemorySoFar += memSize; + } + clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar); + } + + // Recursively visits input operators of an operator and sets the CPU core usage. + private void visitInternal(ILogicalOperator op, boolean toAddOuputBuffer) throws AlgebricksException { + for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) { + inputOpRef.getValue().accept(this, null); + } + if (toAddOuputBuffer) { + addOutputBuffer(op); + } + setAvailableCores(op); + } + + // Adds output buffer for an operator. + private void addOutputBuffer(ILogicalOperator op) { + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + stageMemorySoFar += frameSize * numComputationPartitions; // every operator needs one output buffer. + } else { + stageMemorySoFar += frameSize; // every operator needs one output buffer. + } + clusterCapacity.setAggregatedMemoryByteSize(stageMemorySoFar); + } + + // Sets the number of available cores + private void setAvailableCores(ILogicalOperator op) { + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + clusterCapacity.setAggregatedCores((int) numComputationPartitions); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index 19c00db..5756e7d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -30,7 +30,6 @@ import javax.servlet.Servlet; import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.api.http.servlet.APIServlet; -import org.apache.asterix.api.http.servlet.FullAPIServlet; import org.apache.asterix.api.http.servlet.ClusterAPIServlet; import org.apache.asterix.api.http.servlet.ClusterCCDetailsAPIServlet; import org.apache.asterix.api.http.servlet.ClusterNodeDetailsAPIServlet; @@ -38,6 +37,7 @@ import org.apache.asterix.api.http.servlet.ConnectorAPIServlet; import org.apache.asterix.api.http.servlet.DDLAPIServlet; import org.apache.asterix.api.http.servlet.DiagnosticsAPIServlet; import org.apache.asterix.api.http.servlet.FeedServlet; +import org.apache.asterix.api.http.servlet.FullAPIServlet; import org.apache.asterix.api.http.servlet.QueryAPIServlet; import org.apache.asterix.api.http.servlet.QueryResultAPIServlet; import org.apache.asterix.api.http.servlet.QueryServiceServlet; @@ -47,8 +47,8 @@ import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.api.http.servlet.ShutdownAPIServlet; import org.apache.asterix.api.http.servlet.UpdateAPIServlet; import org.apache.asterix.api.http.servlet.VersionAPIServlet; -import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.cc.CompilerExtensionManager; +import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.config.AsterixExtension; @@ -62,11 +62,13 @@ import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.api.IAsterixStateProxy; import org.apache.asterix.metadata.bootstrap.AsterixStateProxy; import org.apache.asterix.metadata.cluster.ClusterManagerProvider; +import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.util.AppContextInfo; import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.cc.ClusterControllerService; @@ -85,6 +87,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { private static IAsterixStateProxy proxy; protected ICCApplicationContext appCtx; protected CompilerExtensionManager ccExtensionManager; + private IJobCapacityController jobCapacityController; @Override public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception { @@ -131,6 +134,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE); ccAppCtx.setMessageBroker(messageBroker); + + jobCapacityController = new JobCapacityController(controllerService.getResourceManager()); } protected List<AsterixExtension> getExtensions() { @@ -330,6 +335,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); } + @Override + public IJobCapacityController getJobCapacityController() { + return jobCapacityController; + } + public static synchronized void setAsterixStateProxy(IAsterixStateProxy proxy) { CCApplicationEntryPoint.proxy = proxy; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 7a4ff13..41d8b0d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -19,6 +19,7 @@ package org.apache.asterix.hyracks.bootstrap; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -93,7 +94,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { } @Override - public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException { + public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException { for (String deadNode : deadNodeIds) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + deadNode + " left"); @@ -118,7 +119,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { } } - private void updateProgress(ClusterEventType eventType, Set<String> nodeIds) { + private void updateProgress(ClusterEventType eventType, Collection<String> nodeIds) { List<IClusterManagementWorkResponse> completedResponses = new ArrayList<IClusterManagementWorkResponse>(); boolean isComplete = false; for (IClusterManagementWorkResponse resp : pendingWorkResponses) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index f64f998..d437b5b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.hyracks.bootstrap; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; @@ -59,7 +60,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { } @Override - public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) { + public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds) { setState(ClusterStateManager.INSTANCE.getState()); ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(false); return Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index 8998c6b..bc270df 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.IPropertiesProvider; @@ -51,6 +52,7 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.application.INCApplicationEntryPoint; +import org.apache.hyracks.api.job.resource.NodeCapacity; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.messages.IMessageBroker; @@ -261,6 +263,19 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { performLocalCleanUp(); } + @Override + public NodeCapacity getCapacity() { + IPropertiesProvider propertiesProvider = (IPropertiesProvider) runtimeContext; + StorageProperties storageProperties = propertiesProvider.getStorageProperties(); + // Deducts the reserved buffer cache size and memory component size from the maxium heap size, + // and deducts one core for processing heartbeats. + long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize() + - storageProperties.getMemoryComponentGlobalBudget(); + int allCores = Runtime.getRuntime().availableProcessors(); + int maximumCoresForComputation = allCores > 1 ? allCores - 1 : allCores; + return new NodeCapacity(memorySize, maximumCoresForComputation); + } + private void performLocalCleanUp() { //Delete working area files from failed jobs runtimeContext.getIOManager().deleteWorkspaceFiles(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index d1d7ff7..d785cce 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -28,6 +28,7 @@ import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; public class CCMessageBroker implements ICCMessageBroker { @@ -49,8 +50,8 @@ public class CCMessageBroker implements ICCMessageBroker { @Override public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception { - Map<String, NodeControllerState> nodeMap = ccs.getNodeMap(); - NodeControllerState state = nodeMap.get(nodeId); + INodeManager nodeManager = ccs.getNodeManager(); + NodeControllerState state = nodeManager.getNodeControllerState(nodeId); state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java new file mode 100644 index 0000000..50a21bc --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.util; + +import org.apache.asterix.app.resource.RequiredCapacityVisitor; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import org.apache.hyracks.api.job.resource.ClusterCapacity; +import org.apache.hyracks.api.job.resource.IClusterCapacity; + +public class ResourceUtils { + + private ResourceUtils() { + } + + /** + * Calculates the required cluster capacity from a given query plan, the computation locations, + * the operator memory budgets, and frame size. + * + * @param plan, + * a given query plan. + * @param computationLocations, + * the partitions for computation. + * @param sortFrameLimit, + * the frame limit for one sorter partition. + * @param groupFrameLimit, + * the frame limit for one group-by partition. + * @param joinFrameLimit + * the frame limit for one joiner partition. + * @param frameSize + * the frame size used in query execution. + * @return the required cluster capacity for executing the query. + * @throws AlgebricksException + * if the query plan is malformed. + */ + public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan, + AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit, + int joinFrameLimit, int frameSize) + throws AlgebricksException { + // Creates a cluster capacity visitor. + IClusterCapacity clusterCapacity = new ClusterCapacity(); + RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length, + sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity); + + // There could be only one root operator for a top-level query plan. + ILogicalOperator rootOp = plan.getRoots().get(0).getValue(); + rootOp.accept(visitor, null); + return clusterCapacity; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java index a79053e..e041021 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java @@ -27,8 +27,10 @@ import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.Map; +import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IClusterInfoCollector; import org.apache.hyracks.api.client.NodeControllerInfo; import org.junit.Assert; @@ -39,15 +41,16 @@ import junit.extensions.PA; public class APIFrameworkTest { @Test - public void testGetComputationLocations() throws Exception { + public void testChooseLocations() throws Exception { + // Mocks cluster info collector. IClusterInfoCollector clusterInfoCollector = mock(IClusterInfoCollector.class); // Constructs mocked cluster nodes. Map<String, NodeControllerInfo> map = new HashMap<>(); NodeControllerInfo nc1Info = mock(NodeControllerInfo.class); - when(nc1Info.getNumCores()).thenReturn(4); + when(nc1Info.getNumAvailableCores()).thenReturn(1); NodeControllerInfo nc2Info = mock(NodeControllerInfo.class); - when(nc2Info.getNumCores()).thenReturn(4); + when(nc2Info.getNumAvailableCores()).thenReturn(1); String nc1 = "nc1"; String nc2 = "nc2"; map.put(nc1, nc1Info); @@ -57,10 +60,56 @@ public class APIFrameworkTest { // Creates an APIFramework. APIFramework apiFramework = new APIFramework(mock(ILangCompilationProvider.class)); + // Tests large storage locations. + AlgebricksAbsolutePartitionConstraint storageLocations = new AlgebricksAbsolutePartitionConstraint( + new String[] { "node1", "node1", "node2" }); + AlgebricksAbsolutePartitionConstraint computationLocations = (AlgebricksAbsolutePartitionConstraint) PA + .invokeMethod(apiFramework, + "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int," + + AlgebricksAbsolutePartitionConstraint.class.getName() + ")", + clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations); + Assert.assertTrue(computationLocations.getLocations().length == 2); + + // Tests suitable storage locations. + storageLocations = new AlgebricksAbsolutePartitionConstraint(new String[] { "node1", "node2" }); + computationLocations = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, + "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int," + + AlgebricksAbsolutePartitionConstraint.class.getName() + ")", + clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations); + Assert.assertTrue(computationLocations.getLocations().length == 2); + + // Tests small storage locations. + storageLocations = new AlgebricksAbsolutePartitionConstraint(new String[] { "node1" }); + computationLocations = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, + "chooseLocations(" + IClusterInfoCollector.class.getName() + ",int," + + AlgebricksAbsolutePartitionConstraint.class.getName() + ")", + clusterInfoCollector, CompilerProperties.COMPILER_PARALLELISM_AS_STORAGE, storageLocations); + Assert.assertTrue(computationLocations.getLocations().length == 1); + + // Verifies the number of calls on clusterInfoCollector.getNodeControllerInfos() in + // APIFramework.chooseLocations(...). + verify(clusterInfoCollector, times(3)).getNodeControllerInfos(); + } + + @Test + public void testGetComputationLocations() throws AlgebricksException { + // Constructs mocked cluster nodes. + Map<String, NodeControllerInfo> map = new HashMap<>(); + NodeControllerInfo nc1Info = mock(NodeControllerInfo.class); + when(nc1Info.getNumAvailableCores()).thenReturn(4); + NodeControllerInfo nc2Info = mock(NodeControllerInfo.class); + when(nc2Info.getNumAvailableCores()).thenReturn(4); + String nc1 = "nc1"; + String nc2 = "nc2"; + map.put(nc1, nc1Info); + map.put(nc2, nc2Info); + + // Creates an APIFramework. + APIFramework apiFramework = new APIFramework(mock(ILangCompilationProvider.class)); + // Tests odd number parallelism. AlgebricksAbsolutePartitionConstraint loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod( - apiFramework, "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", - clusterInfoCollector, 5); + apiFramework, "getComputationLocations(java.util.Map,int)", map, 5); int nc1Count = 0, nc2Count = 0; String[] partitions = loc.getLocations(); for (String partition : partitions) { @@ -78,7 +127,7 @@ public class APIFrameworkTest { // Tests even number parallelism. loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, - "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 8); + "getComputationLocations(java.util.Map,int)", map, 8); nc1Count = 0; nc2Count = 0; partitions = loc.getLocations(); @@ -93,40 +142,35 @@ public class APIFrameworkTest { Assert.assertTrue(nc1Count > 0); Assert.assertTrue(nc2Count > 0); Assert.assertTrue(Math.abs(nc1Count - nc2Count) == 0); // Tests load balance. - // The maximum parallelism cannot be beyond n *(#core-1), where n is the number of NCs and #core is the number + // The maximum parallelism cannot be beyond n * core, where n is the number of NCs and #core is the number // of cores per NC. - Assert.assertTrue(partitions.length == 6); + Assert.assertTrue(partitions.length == 8); // Tests the case when parallelism is one. loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, - "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 1); + "getComputationLocations(java.util.Map,int)", map, 1); Assert.assertTrue(loc.getLocations().length == 1); // Tests the case when parallelism is a negative. // In this case, the compiler has no idea and falls back to the default setting where all possible cores // are used. loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, - "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, - -100); - Assert.assertTrue(loc.getLocations().length == 6); + "getComputationLocations(java.util.Map,int)", map, -100); + Assert.assertTrue(loc.getLocations().length == 8); // Tests the case when parallelism is -1. // In this case, the compiler has no idea and falls back to the default setting where all possible cores // are used. loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, - "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, -1); - Assert.assertTrue(loc.getLocations().length == 6); + "getComputationLocations(java.util.Map,int)", map, -1); + Assert.assertTrue(loc.getLocations().length == 8); // Tests the case when parallelism is zero. // In this case, the compiler has no idea and falls back to the default setting where all possible cores // are used. loc = (AlgebricksAbsolutePartitionConstraint) PA.invokeMethod(apiFramework, - "getComputationLocations(" + IClusterInfoCollector.class.getName() + ",int)", clusterInfoCollector, 0); - Assert.assertTrue(loc.getLocations().length == 6); - - // Verifies the number of calls on clusterInfoCollector.getNodeControllerInfos() in - // APIFramework.getComputationLocations(...). - verify(clusterInfoCollector, times(6)).getNodeControllerInfos(); + "getComputationLocations(java.util.Map,int)", map, 0); + Assert.assertTrue(loc.getLocations().length == 8); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java new file mode 100644 index 0000000..cc18c31 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.resource; + +import java.util.Collections; + +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator; +import org.apache.hyracks.api.job.resource.ClusterCapacity; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.junit.Assert; +import org.junit.Test; + +public class RequiredCapacityVisitorTest { + + private static final long MEMORY_BUDGET = 33554432L; + private static final int FRAME_SIZE = 32768; + private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE); + private static final int PARALLELISM = 10; + + @Test + public void testParallelGroupBy() throws AlgebricksException { + IClusterCapacity clusterCapacity = new ClusterCapacity(); + RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity); + + // Constructs a parallel group-by query plan. + GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + ExchangeOperator exchange = new ExchangeOperator(); + exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null)); + GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL); + globalGby.getInputs().add(new MutableObject<>(exchange)); + exchange.getInputs().add(new MutableObject<>(localGby)); + + // Verifies the calculated cluster capacity requirement for the test quer plan. + globalGby.accept(visitor, null); + Assert.assertTrue(clusterCapacity.getAggregatedCores() == PARALLELISM); + Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 2 * MEMORY_BUDGET * PARALLELISM + + 2 * FRAME_SIZE * PARALLELISM * PARALLELISM); + } + + @Test + public void testUnPartitionedGroupBy() throws AlgebricksException { + IClusterCapacity clusterCapacity = new ClusterCapacity(); + RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity); + + // Constructs a parallel group-by query plan. + GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + ExchangeOperator exchange = new ExchangeOperator(); + exchange.setPhysicalOperator(new OneToOneExchangePOperator()); + exchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + globalGby.getInputs().add(new MutableObject<>(exchange)); + exchange.getInputs().add(new MutableObject<>(localGby)); + + // Verifies the calculated cluster capacity requirement for the test quer plan. + globalGby.accept(visitor, null); + Assert.assertTrue(clusterCapacity.getAggregatedCores() == 1); + Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 2 * MEMORY_BUDGET + FRAME_SIZE); + } + + @Test + public void testParallelJoin() throws AlgebricksException { + IClusterCapacity clusterCapacity = new ClusterCapacity(); + RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity); + + // Constructs a join query plan. + InnerJoinOperator join = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + + // Left child plan of the join. + ExchangeOperator leftChildExchange = new ExchangeOperator(); + leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null)); + InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + join.getInputs().add(new MutableObject<>(leftChildExchange)); + leftChildExchange.getInputs().add(new MutableObject<>(leftChild)); + EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator(); + ets.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + leftChild.getInputs().add(new MutableObject<>(ets)); + leftChild.getInputs().add(new MutableObject<>(ets)); + + // Right child plan of the join. + ExchangeOperator rightChildExchange = new ExchangeOperator(); + rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null)); + GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL); + join.getInputs().add(new MutableObject<>(rightChildExchange)); + rightChildExchange.getInputs().add(new MutableObject<>(rightChild)); + rightChild.getInputs().add(new MutableObject<>(ets)); + + // Verifies the calculated cluster capacity requirement for the test quer plan. + join.accept(visitor, null); + Assert.assertTrue(clusterCapacity.getAggregatedCores() == PARALLELISM); + Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 3 * MEMORY_BUDGET * PARALLELISM + + 2 * 2L * PARALLELISM * PARALLELISM * FRAME_SIZE + 3 * FRAME_SIZE * PARALLELISM); + } + + @Test + public void testUnPartitionedJoin() throws AlgebricksException { + IClusterCapacity clusterCapacity = new ClusterCapacity(); + RequiredCapacityVisitor visitor = makeComputationCapacityVisitor(PARALLELISM, clusterCapacity); + + // Constructs a join query plan. + InnerJoinOperator join = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + + // Left child plan of the join. + ExchangeOperator leftChildExchange = new ExchangeOperator(); + leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + leftChildExchange.setPhysicalOperator(new OneToOneExchangePOperator()); + InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + join.getInputs().add(new MutableObject<>(leftChildExchange)); + leftChildExchange.getInputs().add(new MutableObject<>(leftChild)); + EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator(); + ets.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + leftChild.getInputs().add(new MutableObject<>(ets)); + leftChild.getInputs().add(new MutableObject<>(ets)); + + // Right child plan of the join. + ExchangeOperator rightChildExchange = new ExchangeOperator(); + rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + rightChildExchange.setPhysicalOperator(new OneToOneExchangePOperator()); + GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + join.getInputs().add(new MutableObject<>(rightChildExchange)); + rightChildExchange.getInputs().add(new MutableObject<>(rightChild)); + rightChild.getInputs().add(new MutableObject<>(ets)); + + // Verifies the calculated cluster capacity requirement for the test quer plan. + join.accept(visitor, null); + Assert.assertTrue(clusterCapacity.getAggregatedCores() == 1); + Assert.assertTrue(clusterCapacity.getAggregatedMemoryByteSize() == 3 * MEMORY_BUDGET + 5L * FRAME_SIZE); + } + + private RequiredCapacityVisitor makeComputationCapacityVisitor(int numComputationPartitions, + IClusterCapacity clusterCapacity) { + return new RequiredCapacityVisitor(numComputationPartitions, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE, + clusterCapacity); + } + + private GroupByOperator makeGroupByOperator(AbstractLogicalOperator.ExecutionMode exeMode) { + GroupByOperator groupByOperator = new GroupByOperator(); + groupByOperator.setExecutionMode(exeMode); + return groupByOperator; + } + + private InnerJoinOperator makeJoinOperator(AbstractLogicalOperator.ExecutionMode exeMode) { + InnerJoinOperator joinOperator = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE)); + joinOperator.setExecutionMode(exeMode); + return joinOperator; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp index 774747e..e0ae61c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/q09/q09.3.query.sqlpp @@ -20,6 +20,9 @@ use tpcds; +set `compiler.joinmemory` "4MB" +set `compiler.groupmemory` "4MB" + select case when (select value count(ss) from store_sales ss where ss_quantity >= 1 and ss_quantity <= 20) > 25437 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java index 3a98b82..fef4e31 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java @@ -18,6 +18,7 @@ package org.apache.asterix.common.api; * specific language governing permissions and limitations * under the License. */ +import java.util.Collection; import java.util.Set; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; @@ -28,7 +29,7 @@ public interface IClusterEventsSubscriber { * @param deadNodeIds * @return */ - public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds); + public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds); /** * @param joinedNodeId http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml b/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml index 9992009..a5ecc6b 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/asterix-configuration.xml @@ -20,7 +20,7 @@ <property> <name>nc.java.opts</name> - <value>-Xmx3096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"</value> + <value>-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"</value> <description>JVM parameters for each Node Contoller (NC)</description> </property> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java index 580aab7..34b873c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/RemoveNodeWorkResponse.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.metadata.cluster; +import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -30,7 +31,7 @@ public class RemoveNodeWorkResponse extends ClusterManagementWorkResponse { nodesToBeRemoved.addAll(w.getNodesToBeRemoved()); } - public boolean updateProgress(Set<String> failedNodeIds) { + public boolean updateProgress(Collection<String> failedNodeIds) { nodesToBeRemoved.removeAll(failedNodeIds); return nodesToBeRemoved.isEmpty(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml index 1ccdc76..6458fb0 100644 --- a/asterixdb/asterix-runtime/pom.xml +++ b/asterixdb/asterix-runtime/pom.xml @@ -140,5 +140,16 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java new file mode 100644 index 0000000..8ea1fa7 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.job.resource; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; +import org.apache.hyracks.control.cc.scheduler.IResourceManager; + +// To avoid the computation cost for checking the capacity constraint for each node, +// currently the admit/allocation decisions are based on the aggregated resource information. +// TODO(buyingyi): investigate partition-aware resource control. +public class JobCapacityController implements IJobCapacityController { + + private final IResourceManager resourceManager; + + public JobCapacityController(IResourceManager resourceManager) { + this.resourceManager = resourceManager; + } + + @Override + public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException { + IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity(); + long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize(); + int reqAggregatedNumCores = requiredCapacity.getAggregatedCores(); + IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity(); + if (!(reqAggregatedMemoryByteSize <= maximumCapacity.getAggregatedMemoryByteSize() + && reqAggregatedNumCores <= maximumCapacity.getAggregatedCores())) { + throw HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, requiredCapacity.toString(), + maximumCapacity.toString()); + } + IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity(); + long currentAggregatedMemoryByteSize = currentCapacity.getAggregatedMemoryByteSize(); + int currentAggregatedAvailableCores = currentCapacity.getAggregatedCores(); + if (!(reqAggregatedMemoryByteSize <= currentAggregatedMemoryByteSize + && reqAggregatedNumCores <= currentAggregatedAvailableCores)) { + return JobSubmissionStatus.QUEUE; + } + currentCapacity.setAggregatedMemoryByteSize(currentAggregatedMemoryByteSize - reqAggregatedMemoryByteSize); + currentCapacity.setAggregatedCores(currentAggregatedAvailableCores - reqAggregatedNumCores); + return JobSubmissionStatus.EXECUTE; + } + + @Override + public void release(JobSpecification job) { + IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity(); + long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize(); + int reqAggregatedNumCores = requiredCapacity.getAggregatedCores(); + IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity(); + long aggregatedMemoryByteSize = currentCapacity.getAggregatedMemoryByteSize(); + int aggregatedNumCores = currentCapacity.getAggregatedCores(); + currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize); + currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java index ed93a2c..608def7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.cluster.INodeManager; /** * Utility class for obtaining information on the set of Hyracks NodeController @@ -61,6 +62,7 @@ public class RuntimeUtils { public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) { ClusterControllerService ccs = (ClusterControllerService) AppContextInfo.INSTANCE .getCCApplicationContext().getControllerService(); - map.putAll(ccs.getIpAddressNodeNameMap()); + INodeManager nodeManager = ccs.getNodeManager(); + map.putAll(nodeManager.getIpAddressNodeNameMap()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java new file mode 100644 index 0000000..4a63885 --- /dev/null +++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.job.resource; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.resource.ClusterCapacity; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.control.cc.scheduler.IResourceManager; +import org.apache.hyracks.control.cc.scheduler.ResourceManager; +import org.junit.Assert; +import org.junit.Test; + +public class JobCapacityControllerTest { + + @Test + public void test() throws HyracksException { + IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33); + JobCapacityController capacityController = new JobCapacityController(resourceManager); + + // Verifies the correctness of the allocate method. + Assert.assertTrue(capacityController.allocate( + makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE); + Assert.assertTrue(capacityController.allocate( + makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE); + Assert.assertTrue(capacityController.allocate( + makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE); + + boolean exceedCapacity = false; + try { + capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64)); + } catch (HyracksException e) { + exceedCapacity = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY; + } + Assert.assertTrue(exceedCapacity); + Assert.assertTrue(capacityController.allocate( + makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE); + exceedCapacity = false; + try { + capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33)); + } catch (HyracksException e) { + exceedCapacity = e.getErrorCode() == ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY; + } + Assert.assertTrue(exceedCapacity); + + // Verifies that the release method does not leak resource. + capacityController.release(makeJobWithRequiredCapacity(4294967296L, 16)); + Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize() == 4294967296L); + Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedCores() == 33); + } + + private IResourceManager makeResourceManagerWithCapacity(long memorySize, int cores) throws HyracksException { + IResourceManager resourceManager = new ResourceManager(); + resourceManager.update("node1", new NodeCapacity(memorySize, cores)); + return resourceManager; + } + + private JobSpecification makeJobWithRequiredCapacity(long memorySize, int cores) { + // Generates cluster capacity. + IClusterCapacity clusterCapacity = makeComputationCapacity(memorySize, cores); + + // Generates a job. + JobSpecification job = mock(JobSpecification.class); + when(job.getRequiredClusterCapacity()).thenReturn(clusterCapacity); + return job; + } + + private IClusterCapacity makeComputationCapacity(long memorySize, int cores) { + IClusterCapacity clusterCapacity = new ClusterCapacity(); + clusterCapacity.setAggregatedMemoryByteSize(memorySize); + clusterCapacity.setAggregatedCores(cores); + return clusterCapacity; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java index 4a5f90f..83d421f 100644 --- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java +++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java @@ -78,9 +78,11 @@ public class SampleLocalClusterIT { @Test public void test0_startCluster() throws Exception { - Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")) + Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f") .inheritIO().start(); Assert.assertEquals(0, process.waitFor()); + process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start(); + Assert.assertEquals(0, process.waitFor()); } @Test