http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index d01d8fd..a1f9d9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,17 +17,15 @@ */ package org.apache.drill.exec.work.foreman; -import com.codahale.metrics.Counter; -import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + import org.apache.drill.common.CatastrophicFailure; import org.apache.drill.common.EventProcessor; import org.apache.drill.common.concurrent.ExtendedLatch; @@ -36,9 +34,6 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.coord.ClusterCoordinator; -import org.apache.drill.exec.coord.DistributedSemaphore; -import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.metrics.DrillMetrics; @@ -73,43 +68,51 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; -import org.apache.drill.exec.util.MemoryAllocationUtilities; import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; +import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentStatusReporter; import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.RootFragmentManager; import org.codehaus.jackson.map.ObjectMapper; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Counter; +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; /** * Foreman manages all the fragments (local and remote) for a single query where this * is the driving/root node. * * The flow is as follows: - * - Foreman is submitted as a runnable. - * - Runnable does query planning. - * - state changes from PENDING to RUNNING - * - Runnable sends out starting fragments - * - Status listener are activated - * - The Runnable's run() completes, but the Foreman stays around - * - Foreman listens for state change messages. - * - state change messages can drive the state to FAILED or CANCELED, in which case - * messages are sent to running fragments to terminate - * - when all fragments complete, state change messages drive the state to COMPLETED + * <ul> + * <li>Foreman is submitted as a runnable.</li> + * <li>Runnable does query planning.</li> + * <li>state changes from PENDING to RUNNING</li> + * <li>Runnable sends out starting fragments</li> + * <li>Status listener are activated</li> + * <li>The Runnable's run() completes, but the Foreman stays around</li> + * <li>Foreman listens for state change messages.</li> + * <li>state change messages can drive the state to FAILED or CANCELED, in which case + * messages are sent to running fragments to terminate</li> + * <li>when all fragments complete, state change messages drive the state to COMPLETED</li> + * </ul> */ + public class Foreman implements Runnable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger"); @@ -136,15 +139,13 @@ public class Foreman implements Runnable { private boolean resume = false; private final ProfileOption profileOption; - private volatile DistributedLease lease; // used to limit the number of concurrent queries + private final QueryResourceManager queryRM; private final ResponseSendListener responseListener = new ResponseSendListener(); private final StateSwitch stateSwitch = new StateSwitch(); private final ForemanResult foremanResult = new ForemanResult(); private final ConnectionClosedListener closeListener = new ConnectionClosedListener(); private final ChannelFuture closeFuture; - private final boolean queuingEnabled; - private String queryText; @@ -173,12 +174,9 @@ public class Foreman implements Runnable { queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), drillbitContext.getClusterCoordinator(), this); - final OptionManager optionManager = queryContext.getOptions(); - queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE); - - final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING; - recordNewState(initialState); + recordNewState(QueryState.ENQUEUED); enqueuedQueries.inc(); + queryRM = drillbitContext.getResourceManager().newQueryRM(this); profileOption = setProfileOption(queryContext.getOptions()); } @@ -350,20 +348,6 @@ public class Foreman implements Runnable { */ } - private void releaseLease() { - while (lease != null) { - try { - lease.close(); - lease = null; - } catch (final InterruptedException e) { - // if we end up here, the while loop will try again - } catch (final Exception e) { - logger.warn("Failure while releasing lease.", e); - break; - } - } - } - private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException { LogicalPlan logicalPlan; try { @@ -431,18 +415,17 @@ public class Foreman implements Runnable { private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException { validatePlan(plan); - MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); - //Marking endTime of Planning - queryManager.markPlanningEndTime(); - - if (queuingEnabled) { - acquireQuerySemaphore(plan); - moveToState(QueryState.STARTING, null); - //Marking endTime of Waiting in Queue - queryManager.markQueueWaitEndTime(); - } + queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan); + queryRM.visitPhysicalPlan(work); + queryRM.setCost(plan.totalCost()); + queryManager.setTotalCost(plan.totalCost()); + work.applyPlan(drillbitContext.getPlanReader()); + logWorkUnit(work); + admit(work); + queryManager.setQueueName(queryRM.queueName()); + final List<PlanFragment> planFragments = work.getFragments(); final PlanFragment rootPlanFragment = work.getRootFragment(); assert queryId == rootPlanFragment.getHandle().getQueryId(); @@ -461,6 +444,22 @@ public class Foreman implements Runnable { logger.debug("Fragments running."); } + private void admit(QueryWorkUnit work) throws ForemanSetupException { + queryManager.markPlanningEndTime(); + try { + queryRM.admit(); + } catch (QueueTimeoutException e) { + throw UserException + .resourceError() + .message(e.getMessage()) + .build(logger); + } catch (QueryQueueException e) { + throw new ForemanSetupException(e.getMessage(), e); + } + moveToState(QueryState.STARTING, null); + queryManager.markQueueWaitEndTime(); + } + /** * This is a helper method to run query based on the list of PlanFragment that were planned * at some point of time @@ -495,10 +494,8 @@ public class Foreman implements Runnable { } catch (IOException e) { throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson())); } - if (queuingEnabled) { - acquireQuerySemaphore(rootOperator.getCost()); - moveToState(QueryState.STARTING, null); - } + queryRM.setCost(rootOperator.getCost()); + admit(null); drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); @@ -548,62 +545,6 @@ public class Foreman implements Runnable { } } - /** - * This limits the number of "small" and "large" queries that a Drill cluster will run - * simultaneously, if queueing is enabled. If the query is unable to run, this will block - * until it can. Beware that this is called under run(), and so will consume a Thread - * while it waits for the required distributed semaphore. - * - * @param plan the query plan - * @throws ForemanSetupException - */ - private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException { - double totalCost = 0; - for (final PhysicalOperator ops : plan.getSortedOperators()) { - totalCost += ops.getCost(); - } - - acquireQuerySemaphore(totalCost); - return; - } - - private void acquireQuerySemaphore(double totalCost) throws ForemanSetupException { - final OptionManager optionManager = queryContext.getOptions(); - final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE); - - final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT); - final String queueName; - - try { - final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator(); - final DistributedSemaphore distributedSemaphore; - - // get the appropriate semaphore - if (totalCost > queueThreshold) { - final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE); - distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue); - queueName = "large"; - } else { - final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE); - distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue); - queueName = "small"; - } - - lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - throw new ForemanSetupException("Unable to acquire slot for query.", e); - } - - if (lease == null) { - throw UserException - .resourceError() - .message( - "Unable to acquire queue resources for query within timeout. Timeout for %s queue was set at %d seconds.", - queueName, queueTimeout / 1000) - .build(logger); - } - } - Exception getCurrentException() { return foremanResult.getException(); } @@ -612,54 +553,55 @@ public class Foreman implements Runnable { final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); - final QueryWorkUnit queryWorkUnit = parallelizer.getFragments( + return parallelizer.getFragments( queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment, + queryId, queryContext.getActiveEndpoints(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo()); + } - if (logger.isTraceEnabled()) { - final StringBuilder sb = new StringBuilder(); - sb.append("PlanFragments for query "); - sb.append(queryId); + private void logWorkUnit(QueryWorkUnit queryWorkUnit) { + if (! logger.isTraceEnabled()) { + return; + } + final StringBuilder sb = new StringBuilder(); + sb.append("PlanFragments for query "); + sb.append(queryId); + sb.append('\n'); + + final List<PlanFragment> planFragments = queryWorkUnit.getFragments(); + final int fragmentCount = planFragments.size(); + int fragmentIndex = 0; + for(final PlanFragment planFragment : planFragments) { + final FragmentHandle fragmentHandle = planFragment.getHandle(); + sb.append("PlanFragment("); + sb.append(++fragmentIndex); + sb.append('/'); + sb.append(fragmentCount); + sb.append(") major_fragment_id "); + sb.append(fragmentHandle.getMajorFragmentId()); + sb.append(" minor_fragment_id "); + sb.append(fragmentHandle.getMinorFragmentId()); sb.append('\n'); - final List<PlanFragment> planFragments = queryWorkUnit.getFragments(); - final int fragmentCount = planFragments.size(); - int fragmentIndex = 0; - for(final PlanFragment planFragment : planFragments) { - final FragmentHandle fragmentHandle = planFragment.getHandle(); - sb.append("PlanFragment("); - sb.append(++fragmentIndex); - sb.append('/'); - sb.append(fragmentCount); - sb.append(") major_fragment_id "); - sb.append(fragmentHandle.getMajorFragmentId()); - sb.append(" minor_fragment_id "); - sb.append(fragmentHandle.getMinorFragmentId()); - sb.append('\n'); - - final DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); - sb.append(" DrillbitEndpoint address "); - sb.append(endpointAssignment.getAddress()); - sb.append('\n'); - - String jsonString = "<<malformed JSON>>"; - sb.append(" fragment_json: "); - final ObjectMapper objectMapper = new ObjectMapper(); - try - { - final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); - jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json); - } catch(final Exception e) { - // we've already set jsonString to a fallback value - } - sb.append(jsonString); + final DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); + sb.append(" DrillbitEndpoint address "); + sb.append(endpointAssignment.getAddress()); + sb.append('\n'); - logger.trace(sb.toString()); + String jsonString = "<<malformed JSON>>"; + sb.append(" fragment_json: "); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); + jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json); + } catch(final Exception e) { + // we've already set jsonString to a fallback value } - } + sb.append(jsonString); - return queryWorkUnit; + logger.trace(sb.toString()); + } } /** @@ -897,7 +839,7 @@ public class Foreman implements Runnable { runningQueries.dec(); completedQueries.inc(); try { - releaseLease(); + queryRM.exit(); } finally { isClosed = true; } @@ -953,7 +895,7 @@ public class Foreman implements Runnable { foremanResult.setCompleted(QueryState.CANCELED); /* * We don't close the foremanResult until we've gotten - * acknowledgements, which happens below in the case for current state + * acknowledgments, which happens below in the case for current state * == CANCELLATION_REQUESTED. */ return;
http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index ecbccf3..216a80d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -36,7 +36,6 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.SchemaUserBitShared; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.QueryId; @@ -99,6 +98,15 @@ public class QueryManager implements AutoCloseable { // Is the query saved in transient store private boolean inTransientStore; + /** + * Total query cost. This value is used to place the query into a queue + * and so has meaning to the user who wants to predict queue placement. + */ + + private double totalCost; + + private String queueName; + public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider, final ClusterCoordinator coordinator, final Foreman foreman) { this.queryId = queryId; @@ -191,6 +199,7 @@ public class QueryManager implements AutoCloseable { * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly. */ void cancelExecutingFragments(final DrillbitContext drillbitContext) { + @SuppressWarnings("resource") final Controller controller = drillbitContext.getController(); for(final FragmentData data : fragmentDataSet) { switch(data.getState()) { @@ -219,6 +228,7 @@ public class QueryManager implements AutoCloseable { * sending any message. Resume all fragments through the control tunnel. */ void unpauseExecutingFragments(final DrillbitContext drillbitContext) { + @SuppressWarnings("resource") final Controller controller = drillbitContext.getController(); for(final FragmentData data : fragmentDataSet) { final DrillbitEndpoint endpoint = data.getEndpoint(); @@ -318,6 +328,8 @@ public class QueryManager implements AutoCloseable { .setUser(foreman.getQueryContext().getQueryUserName()) .setForeman(foreman.getQueryContext().getCurrentEndpoint()) .setStart(startTime) + .setTotalCost(totalCost) + .setQueueName(queueName == null ? "-" : queueName) .setOptionsJson(getQueryOptionsAsJson()); if (queryText != null) { @@ -332,7 +344,6 @@ public class QueryManager implements AutoCloseable { } private QueryProfile getQueryProfile(UserException ex) { - final String queryText = foreman.getQueryText(); final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder() .setUser(foreman.getQueryContext().getQueryUserName()) .setType(runQuery.getType()) @@ -345,6 +356,8 @@ public class QueryManager implements AutoCloseable { .setQueueWaitEnd(queueWaitEndTime) .setTotalFragments(fragmentDataSet.size()) .setFinishedFragments(finishedFragments.get()) + .setTotalCost(totalCost) + .setQueueName(queueName == null ? "-" : queueName) .setOptionsJson(getQueryOptionsAsJson()); if (ex != null) { @@ -360,6 +373,7 @@ public class QueryManager implements AutoCloseable { profileBuilder.setPlan(planText); } + final String queryText = foreman.getQueryText(); if (queryText != null) { profileBuilder.setQuery(queryText); } @@ -392,7 +406,6 @@ public class QueryManager implements AutoCloseable { profileBuilder.addFragmentProfile(builder); return true; } - } private class InnerIter implements IntObjectPredicate<FragmentData> { @@ -407,7 +420,6 @@ public class QueryManager implements AutoCloseable { builder.addMinorFragmentProfile(data.getProfile()); return true; } - } void setPlanText(final String planText) { @@ -430,6 +442,14 @@ public class QueryManager implements AutoCloseable { queueWaitEndTime = System.currentTimeMillis(); } + public void setTotalCost(double totalCost) { + this.totalCost = totalCost; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + /** * Internal class used to track the number of pending completion messages required from particular node. This allows * to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that @@ -536,7 +556,7 @@ public class QueryManager implements AutoCloseable { return drillbitStatusListener; } - private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){ + private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener() { @Override public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) { http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java new file mode 100644 index 0000000..9bcadda --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.DrillbitContext; + +/** + * Abstract base class for a resource manager. Handles tasks common to all + * resource managers: learning the resources available on this Drillbit. + * In the current version, Drillbits must be symmetrical, so that knowing + * the resources on one node is sufficient to know resources available on + * all nodes. + */ + +public abstract class AbstractResourceManager implements ResourceManager { + + protected final DrillbitContext context; + private final long memoryPerNode; + private final int cpusPerNode; + + public AbstractResourceManager(final DrillbitContext context) { + this.context = context; + DrillConfig config = context.getConfig(); + + // Normally we use the actual direct memory configured on the JVM command + // line. However, if the config param is set, we use that instead (if it is + // lower than actual memory). Primarily for testing. + + long memLimit = DrillConfig.getMaxDirectMemory(); + long configMemoryPerNode = config.getBytes(ExecConstants.MAX_MEMORY_PER_NODE); + if (configMemoryPerNode > 0) { + memLimit = Math.min(memLimit, configMemoryPerNode); + } + memoryPerNode = memLimit; + + // Do the same for CPUs. + + int cpuLimit = Runtime.getRuntime().availableProcessors(); + int configCpusPerNode = config.getInt(ExecConstants.MAX_CPUS_PER_NODE); + if (configCpusPerNode > 0) { + cpuLimit = Math.min(cpuLimit, configCpusPerNode); + } + cpusPerNode = cpuLimit; + } + + @Override + public long memoryPerNode() { return memoryPerNode; } + + @Override + public int cpusPerNode() { return cpusPerNode; } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java new file mode 100644 index 0000000..c28fbbb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + + private QueryContext queryContext; + + protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; + } + + @Override + public void visitAbstractPlan(PhysicalPlan plan) { + if (plan == null || plan.getProperties().hasResourcePlan) { + return; + } + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); + } + + @Override + public void visitPhysicalPlan(QueryWorkUnit work) { + } + } + + public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { + + @SuppressWarnings("unused") + private final DefaultResourceManager rm; + + public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) { + super(foreman.getQueryContext()); + this.rm = rm; + } + + @Override + public void setCost(double cost) { + // Nothing to do by default. + } + + @Override + public void admit() { + // No queueing by default + } + + @Override + public void exit() { + // No queueing by default + } + + @Override + public boolean hasQueue() { return false; } + + @Override + public String queueName() { return null; } + } + + public final long memoryPerNode; + public final int cpusPerNode; + + public DefaultResourceManager() { + memoryPerNode = DrillConfig.getMaxDirectMemory(); + + // Note: CPUs are not yet used, they will be used in a future + // enhancement. + + cpusPerNode = Runtime.getRuntime().availableProcessors(); + } + + @Override + public long memoryPerNode() { return memoryPerNode; } + + @Override + public int cpusPerNode() { return cpusPerNode; } + + @Override + public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) { + return new DefaultResourceAllocator(queryContext); + } + + @Override + public QueryResourceManager newQueryRM(final Foreman foreman) { + return new DefaultQueryResourceManager(this, foreman); + } + + @Override + public void close() { } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java new file mode 100644 index 0000000..9a4c78d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * <p> + * This queue is configured using system options: + * <dl> + * <dt><tt>exec.queue.enable</tt> + * <dt> + * <dd>Set to true to enable the distributed queue.</dd> + * <dt><tt>exec.queue.large</tt> + * <dt> + * <dd>The maximum number of large queries to admit. Additional + * queries wait in the queue.</dd> + * <dt><tt>exec.queue.small</tt> + * <dt> + * <dd>The maximum number of small queries to admit. Additional + * queries wait in the queue.</dd> + * <dt><tt>exec.queue.threshold</tt> + * <dt> + * <dd>The cost threshold. Queries below this size are small, at + * or above this size are large..</dd> + * <dt><tt>exec.queue.timeout_millis</tt> + * <dt> + * <dd>The maximum time (in milliseconds) a query will wait in the + * queue before failing.</dd> + * </dl> + * <p> + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { + private final QueryId queryId; + private DistributedLease lease; + private final String queueName; + + /** + * Memory allocated to the query. Though all queries in the queue use + * the same memory allocation rules, those rules can change at any time + * as the user changes system options. This value captures the value + * calculated at the time that this lease was granted. + */ + private long queryMemory; + + public DistributedQueueLease(QueryId queryId, String queueName, + DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; + } + + @Override + public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); + } + + @Override + public long queryMemoryPerNode() { return queryMemory; } + + @Override + public void release() { + DistributedQueryQueue.this.release(this); + } + + @Override + public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { + public final int smallQueueSize; + public final int largeQueueSize; + public final double queueThreshold; + public final long memoryPerNode; + public final long memoryPerSmallQuery; + public final long memoryPerLargeQuery; + + public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.configSet.smallQueueSize; + largeQueueSize = queue.configSet.largeQueueSize; + queueThreshold = queue.configSet.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPerSmallQuery = queue.memoryPerSmallQuery; + memoryPerLargeQuery = queue.memoryPerLargeQuery; + } + } + + public interface StatusAdapter { + boolean inShutDown(); + } + + /** + * Holds runtime configuration options. Allows polling the options + * for changes, and easily detecting changes. + */ + + private static class ConfigSet { + private final long queueThreshold; + private final long queueTimeout; + private final int largeQueueSize; + private final int smallQueueSize; + private final double largeToSmallRatio; + private final double reserveMemoryRatio; + private final long minimumOperatorMemory; + + public ConfigSet(SystemOptionManager optionManager) { + queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE); + queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT); + + // Option manager supports only long values, but we do not expect + // more than 2 billion active queries, so queue size is stored as + // an int. + // TODO: Once DRILL-5832 is available, add an getInt() method to + // the option system to get the value as an int and do a range + // check. + + largeQueueSize = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE); + smallQueueSize = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE); + largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO); + reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE); + minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP); + } + + /** + * Determine if this config set is the same as another one. Detects + * whether the configuration has changed between one sync point and + * another. + * <p> + * Note that we cannot use <tt>equals()</tt> here as, according to + * Drill practice, <tt>equals()</tt> is for use in collections and + * must be accompanied by a <tt>hashCode()</tt> method. Since this + * class will never be used in a collection, and does not need a + * hash function, we cannot use <tt>equals()</tt>. + * + * @param otherSet another snapshot taken at another time + * @return true if this configuration is the same as another + * (no update between the two snapshots), false if the config has + * changed between the snapshots + */ + + public boolean isSameAs(ConfigSet otherSet) { + return queueThreshold == otherSet.queueThreshold && + queueTimeout == otherSet.queueTimeout && + largeQueueSize == otherSet.largeQueueSize && + smallQueueSize == otherSet.smallQueueSize && + largeToSmallRatio == otherSet.largeToSmallRatio && + reserveMemoryRatio == otherSet.reserveMemoryRatio && + minimumOperatorMemory == otherSet.minimumOperatorMemory; + } + } + + private long memoryPerNode; + private SystemOptionManager optionManager; + private ConfigSet configSet; + private ClusterCoordinator clusterCoordinator; + private long nextRefreshTime; + private long memoryPerSmallQuery; + private long memoryPerLargeQuery; + private final StatusAdapter statusAdapter; + + public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) { + statusAdapter = adapter; + optionManager = context.getOptionManager(); + clusterCoordinator = context.getClusterCoordinator(); + } + + @Override + public void setMemoryPerNode(long memoryPerNode) { + this.memoryPerNode = memoryPerNode; + refreshConfig(); + } + + @Override + public long defaultQueryMemoryPerNode(double cost) { + return (cost < configSet.queueThreshold) + ? memoryPerSmallQuery + : memoryPerLargeQuery; + } + + @Override + public long minimumOperatorMemory() { return configSet.minimumOperatorMemory; } + + /** + * This limits the number of "small" and "large" queries that a Drill cluster will run + * simultaneously, if queuing is enabled. If the query is unable to run, this will block + * until it can. Beware that this is called under run(), and so will consume a thread + * while it waits for the required distributed semaphore. + * + * @param queryId query identifier + * @param cost the query plan + * @throws QueryQueueException if the underlying ZK queuing mechanism fails + * @throws QueueTimeoutException if the query waits too long in the + * queue + */ + + @SuppressWarnings("resource") + @Override + public QueueLease enqueue(QueryId queryId, double cost) throws QueryQueueException, QueueTimeoutException { + final String queueName; + DistributedLease lease = null; + long queryMemory; + final DistributedSemaphore distributedSemaphore; + try { + + // Only the refresh and queue computation is synchronized. + + synchronized(this) { + refreshConfig(); + + // get the appropriate semaphore + if (cost >= configSet.queueThreshold) { + distributedSemaphore = clusterCoordinator.getSemaphore("query.large", configSet.largeQueueSize); + queueName = "large"; + queryMemory = memoryPerLargeQuery; + } else { + distributedSemaphore = clusterCoordinator.getSemaphore("query.small", configSet.smallQueueSize); + queueName = "small"; + queryMemory = memoryPerSmallQuery; + } + } + logger.debug("Query {} with cost {} placed into the {} queue.", + QueryIdHelper.getQueryId(queryId), cost, queueName); + + lease = distributedSemaphore.acquire(configSet.queueTimeout, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + logger.error("Unable to acquire slot for query " + + QueryIdHelper.getQueryId(queryId), e); + throw new QueryQueueException("Unable to acquire slot for query.", e); + } + + if (lease == null) { + int timeoutSecs = (int) Math.round(configSet.queueTimeout/1000.0); + logger.warn("Queue timeout: {} after {} seconds.", queueName, timeoutSecs); + throw new QueueTimeoutException(queryId, queueName, timeoutSecs); + } + return new DistributedQueueLease(queryId, queueName, lease, queryMemory); + } + + private synchronized void refreshConfig() { + long now = System.currentTimeMillis(); + if (now < nextRefreshTime) { + return; + } + nextRefreshTime = now + 5000; + + // Only update numbers, and log changes, if something + // actually changes. + + ConfigSet newSet = new ConfigSet(optionManager); + if (newSet.isSameAs(configSet)) { + return; + } + + configSet = newSet; + + // Divide up memory between queues using admission rate + // to give more memory to larger queries and less to + // smaller queries. We assume that large queries are + // larger than small queries by a factor of + // largeToSmallRatio. + + double totalUnits = configSet.largeToSmallRatio * configSet.largeQueueSize + configSet.smallQueueSize; + double availableMemory = Math.round(memoryPerNode * (1.0 - configSet.reserveMemoryRatio)); + double memoryUnit = availableMemory / totalUnits; + memoryPerLargeQuery = Math.round(memoryUnit * configSet.largeToSmallRatio); + memoryPerSmallQuery = Math.round(memoryUnit); + + logger.debug("Memory config: total memory per node = {}, available: {}, large/small memory ratio = {}", + memoryPerNode, availableMemory, configSet.largeToSmallRatio); + logger.debug("Reserve memory ratio: {}, bytes: {}", + configSet.reserveMemoryRatio, memoryPerNode - availableMemory); + logger.debug("Minimum operator memory: {}", configSet.minimumOperatorMemory); + logger.debug("Small queue: {} slots, {} bytes per slot", + configSet.smallQueueSize, memoryPerSmallQuery); + logger.debug("Large queue: {} slots, {} bytes per slot", + configSet.largeQueueSize, memoryPerLargeQuery); + logger.debug("Cost threshold: {}, timeout: {} ms.", + configSet.queueThreshold, configSet.queueTimeout); + } + + @Override + public boolean enabled() { return true; } + + public synchronized ZKQueueInfo getInfo() { + refreshConfig(); + return new ZKQueueInfo(this); + } + + private void release(QueueLease lease) { + DistributedQueueLease theLease = (DistributedQueueLease) lease; + for (;;) { + try { + theLease.lease.close(); + theLease.lease = null; + break; + } catch (final InterruptedException e) { + // if we end up here, the loop will try again + } catch (final Exception e) { + logger.warn("Failure while releasing lease.", e); + break; + } + if (inShutdown()) { + logger.warn("In shutdown mode: abandoning attempt to release lease"); + } + } + } + + private boolean inShutdown() { + if (statusAdapter == null) { + return false; + } + return statusAdapter.inShutDown(); + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java new file mode 100644 index 0000000..473401f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; +import org.apache.drill.exec.work.foreman.Foreman; +import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter; + +/** + * Wrapper around the default and/or distributed resource managers + * to allow dynamically enabling and disabling queueing. + */ + +public class DynamicResourceManager implements ResourceManager { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class); + + private final DrillbitContext context; + private ResourceManager defaultRm; + private ResourceManager queueingRm; + private ResourceManager activeRm; + public long nextUpdateTime; + public final int recheckDelayMs = 5000; + + public DynamicResourceManager(final DrillbitContext context) { + this.context = context; + refreshRM(); + } + + public synchronized ResourceManager activeRM() { + refreshRM(); + return activeRm; + } + + @Override + public long memoryPerNode() { + return activeRm.memoryPerNode(); + } + + @Override + public int cpusPerNode() { + return activeRm.cpusPerNode(); + } + + @Override + public synchronized QueryResourceAllocator newResourceAllocator(QueryContext queryContext) { + refreshRM(); + return activeRm.newResourceAllocator(queryContext); + } + + @Override + public synchronized QueryResourceManager newQueryRM(Foreman foreman) { + refreshRM(); + return activeRm.newQueryRM(foreman); + } + + private void refreshRM() { + long now = System.currentTimeMillis(); + if (now < nextUpdateTime) { + return; + } + nextUpdateTime = now + recheckDelayMs; + @SuppressWarnings("resource") + SystemOptionManager systemOptions = context.getOptionManager(); + if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) { + if (queueingRm == null) { + StatusAdapter statusAdapter = new StatusAdapter() { + @Override + public boolean inShutDown() { + // Drill provides no shutdown state at present. + // TODO: Once DRILL-4286 (graceful shutdown) is merged, use the + // new Drillbit status to determine when the Drillbit + // is shutting down. + return false; + } + }; + queueingRm = new ThrottledResourceManager(context, + new DistributedQueryQueue(context, statusAdapter)); + } + if (activeRm != queueingRm) { + logger.debug("Enabling ZK-based query queue."); + activeRm = queueingRm; + } + } else { + if (defaultRm == null) { + defaultRm = new DefaultResourceManager(); + } + if (activeRm != defaultRm) { + logger.debug("Disabling ZK-based query queue."); + activeRm = defaultRm; + } + } + } + + @Override + public void close() { + RuntimeException ex = null; + try { + if (defaultRm != null) { + defaultRm.close(); + } + } catch (RuntimeException e) { + ex = e; + } finally { + defaultRm = null; + } + try { + if (queueingRm != null) { + queueingRm.close(); + } + } catch (RuntimeException e) { + ex = ex == null ? e : ex; + } finally { + queueingRm = null; + } + activeRm = null; + if (ex == null) { + return; + } else if (ex instanceof UserException) { + throw (UserException) ex; + } else { + throw UserException.systemError(ex) + .addContext("Failure closing resource managers.") + .build(logger); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java new file mode 100644 index 0000000..a042f8b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Query queue to be used in an embedded Drillbit. This queue has scope of only + * the one Drillbit (not even multiple Drillbits in the same process.) Primarily + * intended for testing, but may possibly be useful for other embedded + * applications. + * <p> + * Configuration is via config parameters (not via system options as for the + * distributed queue.) + * <dl> + * <dt><tt>drill.queue.embedded.enabled</tt></dt> + * <dd>Set to true to enable the embedded queue. But, this setting has effect + * only if the Drillbit is, in fact, embedded.</dd> + * <dt><tt>drill.queue.embedded.size</tt></dt> + * <dd>The number of active queries, all others queue. There is no upper limit + * on the number of queued entries.</dt> + * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt> + * <dd>The maximum time a query will wait in the queue before failing.</dd> + * </dl> + */ + +public class EmbeddedQueryQueue implements QueryQueue { + + public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded"; + public static String ENABLED = EMBEDDED_QUEUE + ".enable"; + public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size"; + public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms"; + + public class EmbeddedQueueLease implements QueueLease { + + private final QueryId queryId; + private boolean released; + private long queryMemory; + + public EmbeddedQueueLease(QueryId queryId, long queryMemory) { + this.queryId = queryId; + this.queryMemory = queryMemory; + } + + @Override + public String toString( ) { + return new StringBuilder() + .append("Embedded queue lease for ") + .append(QueryIdHelper.getQueryId(queryId)) + .append(released ? " (released)" : "") + .toString(); + } + + @Override + public long queryMemoryPerNode() { + return queryMemory; + } + + @Override + public void release() { + EmbeddedQueryQueue.this.release(this); + released = true; + } + + @VisibleForTesting + boolean isReleased() { return released; } + + @Override + public String queueName() { return "local-queue"; } + } + + private final int queueTimeoutMs; + private final int queueSize; + private final Semaphore semaphore; + private long memoryPerQuery; + private final long minimumOperatorMemory; + + public EmbeddedQueryQueue(DrillbitContext context) { + DrillConfig config = context.getConfig(); + queueTimeoutMs = config.getInt(TIMEOUT_MS); + queueSize = config.getInt(QUEUE_SIZE); + semaphore = new Semaphore(queueSize, true); + minimumOperatorMemory = context.getOptionManager() + .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP); + } + + @Override + public boolean enabled() { return true; } + + @Override + public void setMemoryPerNode(long memoryPerNode) { + memoryPerQuery = memoryPerNode / queueSize; + } + + @Override + public long defaultQueryMemoryPerNode(double cost) { + return memoryPerQuery; + } + + @Override + public QueueLease enqueue(QueryId queryId, double cost) + throws QueueTimeoutException, QueryQueueException { + try { + if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) { + throw new QueueTimeoutException(queryId, "embedded", queueTimeoutMs); + } + } catch (InterruptedException e) { + throw new QueryQueueException("Interrupted", e); + } + return new EmbeddedQueueLease(queryId, memoryPerQuery); + } + + private void release(EmbeddedQueueLease lease) { + assert ! lease.released; + semaphore.release(); + } + + @Override + public void close() { + assert semaphore.availablePermits() == queueSize; + } + + @Override + public long minimumOperatorMemory() { + return minimumOperatorMemory; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java new file mode 100644 index 0000000..72dc2d6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.proto.UserBitShared.QueryId; + +/** + * Interface which defines a queue implementation for query queues. + * Implementations can queue locally, queue distributed, or do + * nothing at all. + * <p> + * A queue can report itself as enabled or disabled. When enabled, + * all queries must obtain a lease prior to starting execution. The + * lease must be released at the completion of execution. + */ + +public interface QueryQueue { + + /** + * The opaque lease returned once a query is admitted + * for execution. + */ + + public interface QueueLease { + long queryMemoryPerNode(); + + /** + * Release a query lease obtained from {@link #queue(QueryId, double))}. + * Should be called by the per-query resource manager. + * + * @param lease the lease to be released. + */ + + void release(); + + String queueName(); + }; + + /** + * Exception thrown if a query exceeds the configured wait time + * in the query queue. + */ + + @SuppressWarnings("serial") + public class QueueTimeoutException extends Exception { + + private final QueryId queryId; + private final String queueName; + private final int timeoutMs; + + public QueueTimeoutException(QueryId queryId, String queueName, int timeoutMs) { + super( String.format( + "Query timed out of the %s queue after %d ms.", + queueName, timeoutMs )); + this.queryId = queryId; + this.queueName = queueName; + this.timeoutMs = timeoutMs; + } + + public QueryId queryId() { return queryId; } + public String queueName() { return queueName; } + public int timeoutMs() { return timeoutMs; } + } + + /** + * Exception thrown for all non-timeout error conditions. + */ + + @SuppressWarnings("serial") + public class QueryQueueException extends Exception { + QueryQueueException(String msg, Exception e) { + super(msg, e); + } + } + + void setMemoryPerNode(long memoryPerNode); + + /** + * Return the amount of memory per node when creating a EXPLAIN + * query plan. Plans to be executed should get the query memory from + * the lease, as the lease may adjust the default amount on a per-query + * basis. This means that the memory used to execute the query may + * differ from the amount shown in an EXPLAIN plan. + * + * @return assumed memory per node, in bytes, to use when creating + * an EXPLAIN plan + */ + + long defaultQueryMemoryPerNode(double cost); + + /** + * Optional floor on the amount of memory assigned per operator. + * This ensures that operators receive a certain amount, separate from + * any memory slicing. This can oversubscribe node memory if used + * incorrectly. + * + * @return minimum per-operator memory, in bytes + */ + + long minimumOperatorMemory(); + + /** + * Determine if the queue is enabled. + * @return true if the query is enabled, false otherwise. + */ + + boolean enabled(); + + /** + * Queue a query. The method returns only when the query is admitted for + * execution. As a result, the calling thread may block up to the configured + * wait time. + * @param queryId the query ID + * @param cost the cost of the query used for cost-based queueing + * @return the query lease which must be passed to {@link #release(QueueLease)} + * upon query completion + * @throws QueueTimeoutException if the query times out waiting to be + * admitted. + * @throws QueryQueueException for any other error condition. + */ + + QueueLease enqueue(QueryId queryId, double cost) throws QueueTimeoutException, QueryQueueException; + + void close(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java new file mode 100644 index 0000000..35dbe59 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.work.QueryWorkUnit; + +/** + * Manages resources for an individual query in conjunction with the + * global {@link ResourceManager}. Handles memory and CPU allocation. + * Instances of this class handle query planning and are used when the + * client wants to plan the query, but not execute it. An implementation + * of {@link QueryResourceManager} is used to both plan the query and + * queue it for execution. + * <p> + * This interface allows a variety of resource management strategies to + * exist for different purposes. + * <p> + * The methods here assume external synchronization: a single query calls + * the methods at known times; there are no concurrent calls. + */ + +public interface QueryResourceAllocator { + + /** + * Make any needed adjustments to the query plan before parallelization. + * + * @param plan + */ + void visitAbstractPlan(PhysicalPlan plan); + + /** + * Provide the manager with the physical plan and node assignments + * for the query to be run. This class will plan memory for the query. + * + * @param plan + * @param work + */ + + void visitPhysicalPlan(QueryWorkUnit work); +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java new file mode 100644 index 0000000..9e2a3a1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; + +/** + * Extends a {@link QueryResourceAllocator} to provide queueing support. + */ + +public interface QueryResourceManager extends QueryResourceAllocator { + + /** + * Hint that this resource manager queues. Allows the Foreman + * to short-circuit expensive logic if no queuing will actually + * be done. This is a static attribute per Drillbit run. + */ + + boolean hasQueue(); + + /** + * For some cases the foreman does not have a full plan, just a cost. In + * this case, this object will not plan memory, but still needs the cost + * to place the job into the correct queue. + * @param cost + */ + + void setCost(double cost); + + /** + * Admit the query into the cluster. Blocks until the query + * can run. (Later revisions may use a more thread-friendly + * approach.) + * @throws QueryQueueException if something goes wrong with the + * queue mechanism + * @throws QueueTimeoutException if the query timed out waiting to + * be admitted. + */ + + void admit() throws QueueTimeoutException, QueryQueueException; + + /** + * Returns the name of the queue (if any) on which the query was + * placed. Valid only after the query is admitted. + * + * @return queue name, or null if queuing is not enabled. + */ + + String queueName(); + + /** + * Mark the query as completing, giving up its slot in the + * cluster. Releases any lease that may be held for a system with queues. + */ + + void exit(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java new file mode 100644 index 0000000..71dabaf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Drillbit-wide resource manager shared by all queries. Manages memory (at + * present) and CPU (planned). Since queries are the primary consumer of + * resources, manages resources by throttling queries into the system, and + * allocating resources to queries in order to control total use. An "null" + * implementation handles the case of no queuing. Clearly, the null case cannot + * effectively control resource use. + */ + +public interface ResourceManager { + + /** + * Returns the memory, in bytes, assigned to each node in a Drill cluster. + * Drill requires that nodes are symmetrical. So, knowing the memory on any + * one node also gives the memory on all other nodes. + * + * @return the memory, in bytes, available in each Drillbit + */ + long memoryPerNode(); + + int cpusPerNode(); + + /** + * Create a resource manager to prepare or describe a query. In this form, no + * queuing is done, but the plan is created as if queuing had been done. Used + * when executing EXPLAIN PLAN. + * + * @return a resource manager for the query + */ + + QueryResourceAllocator newResourceAllocator(QueryContext queryContext); + + /** + * Create a resource manager to execute a query. + * + * @param foreman + * Foreman which manages the execution + * @return a resource manager for the query + */ + + QueryResourceManager newQueryRM(final Foreman foreman); + + void close(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java new file mode 100644 index 0000000..4305891 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.local.LocalClusterCoordinator; +import org.apache.drill.exec.server.DrillbitContext; + +/** + * Builds the proper resource manager and queue implementation for the configured + * system options. + * <p> + * <ul> + * <li>If the Drillbit is embedded<ul> + * <li>If queues are enabled, then the admission-controlled resource manager + * with the local query queue.</li> + * <li>Otherwise, the default resource manager and no queues.</li> + * </ul></li> + * <li>If the Drillbit is in a cluster<ul> + * <li>If queues are enabled, then the admission-controlled resource manager + * with the distributed query queue.</li> + * <li>Otherwise, the default resource manager and no queues.</li> + * </ul></li> + * </ul> + * Configuration settings: + * <dl> + * <dt>Cluster coordinator instance</dt> + * <dd>If an instance of <tt>LocalClusterCoordinator</tt>, the Drillbit is + * embedded, else it is in a cluster.</dd> + * <dt><tt>drill.exec.queue.embedded.enable</tt> boot config<dt> + * <dd>If enabled, and if embedded, then use the local queue.</dd> + * <dt><tt>exec.queue.enable</tt> system option</dt> + * <dd>If enabled, and if in a cluster, then use the distributed queue.</dd> + * </dl> + */ +public class ResourceManagerBuilder { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceManagerBuilder.class); + + private DrillbitContext context ; + + public ResourceManagerBuilder(final DrillbitContext context) { + this.context = context; + } + + @SuppressWarnings("resource") + public ResourceManager build() { + ClusterCoordinator coord = context.getClusterCoordinator(); + DrillConfig config = context.getConfig(); + if (coord instanceof LocalClusterCoordinator) { + if (config.getBoolean(EmbeddedQueryQueue.ENABLED)) { + logger.debug("Enabling embedded, local query queue."); + return new ThrottledResourceManager(context, new EmbeddedQueryQueue(context)); + } else { + logger.debug("No query queueing enabled."); + return new DefaultResourceManager(); + } + } else { + return new DynamicResourceManager(context); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java new file mode 100644 index 0000000..b46fe09 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn; +import org.apache.drill.exec.work.foreman.Foreman; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +/** + * Global resource manager that provides basic admission control (AC) via a + * configured queue: either the Zookeeper-based distributed queue or the + * in-process embedded Drillbit queue. The queue places an upper limit on the + * number of running queries. This limit then "slices" memory and CPU between + * queries: each gets the same share of resources. + * <p> + * This is a "basic" implementation. Clearly, a more advanced implementation + * could look at query cost to determine whether to give a given query more or + * less than the "standard" share. That is left as a future exercise; in this + * version we just want to get the basics working. + * <p> + * This is the resource manager level. This resource manager is paired with a + * queue implementation to produce a complete solution. This composition-based + * approach allows sharing of functionality across queue implementations. + */ + +public class ThrottledResourceManager extends AbstractResourceManager { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(ThrottledResourceManager.class); + + public static class QueuedResourceAllocator + implements QueryResourceAllocator { + + protected final ThrottledResourceManager rm; + protected QueryContext queryContext; + protected PhysicalPlan plan; + protected QueryWorkUnit work; + protected double queryCost; + + protected QueuedResourceAllocator(final ThrottledResourceManager rm, + QueryContext queryContext) { + this.rm = rm; + this.queryContext = queryContext; + } + + @Override + public void visitAbstractPlan(PhysicalPlan plan) { + this.plan = plan; + queryCost = plan.totalCost(); + } + + @Override + public void visitPhysicalPlan(final QueryWorkUnit work) { + this.work = work; + planMemory(); + } + + private void planMemory() { + if (plan.getProperties().hasResourcePlan) { + logger.debug("Memory already planned."); + return; + } + + // Group fragments by node. + + Map<String, Collection<PhysicalOperator>> nodeMap = buildBufferedOpMap(); + + // Memory must be symmetric to avoid bottlenecks in which one node has + // sorts (say) with less memory than another, causing skew in data arrival + // rates for downstream operators. + + int width = countBufferingOperators(nodeMap); + + // Then, share memory evenly across the + // all sort operators on that node. This handles asymmetric distribution + // such as occurs if a sort appears in the root fragment (the one with + // screen), + // which is never parallelized. + + for (Entry<String, Collection<PhysicalOperator>> entry : nodeMap.entrySet()) { + planNodeMemory(entry.getKey(), entry.getValue(), width); + } + } + + private int countBufferingOperators( + Map<String, Collection<PhysicalOperator>> nodeMap) { + int width = 0; + for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) { + width = Math.max(width, fragSorts.size()); + } + return width; + } + + /** + * Given the set of buffered operators (from any number of fragments) on a + * single node, shared the per-query memory equally across all the + * operators. + * + * @param nodeAddr + * @param bufferedOps + * @param width + */ + + private void planNodeMemory(String nodeAddr, + Collection<PhysicalOperator> bufferedOps, int width) { + + // If no buffering operators, nothing to plan. + + if (bufferedOps.isEmpty()) { + return; + } + + // Divide node memory evenly among the set of operators, in any minor + // fragment, on the node. This is not very sophisticated: it does not + // deal with, say, three stacked sorts in which, if sort A runs, then + // B may be using memory, but C cannot be active. That kind of analysis + // is left as a later exercise. + + long nodeMemory = queryMemoryPerNode(); + + // Set a floor on the amount of memory per operator based on the + // configured minimum. This is likely unhelpful because we are trying + // to work around constrained memory by assuming more than we actually + // have. This may lead to an OOM at run time. + + long preferredOpMemory = nodeMemory / width; + long perOpMemory = Math.max(preferredOpMemory, rm.minimumOperatorMemory()); + if (preferredOpMemory < perOpMemory) { + logger.warn("Preferred per-operator memory: {}, actual amount: {}", + preferredOpMemory, perOpMemory); + } + logger.debug( + "Query: {}, Node: {}, allocating {} bytes each for {} buffered operator(s).", + QueryIdHelper.getQueryId(queryContext.getQueryId()), nodeAddr, + perOpMemory, width); + + for (PhysicalOperator op : bufferedOps) { + + // Limit the memory to the maximum in the plan. Doing so is + // likely unnecessary, and perhaps harmful, because the pre-planned + // allocation is the default maximum hard-coded to 10 GB. This means + // that even if 20 GB is available to the sort, it won't use more + // than 10GB. This is probably more of a bug than a feature. + + long alloc = Math.min(perOpMemory, op.getMaxAllocation()); + + // Place a floor on the memory that is the initial allocation, + // since we don't want the operator to run out of memory when it + // first starts. + + alloc = Math.max(alloc, op.getInitialAllocation()); + + if (alloc > preferredOpMemory && alloc != perOpMemory) { + logger.warn("Allocated memory of {} for {} exceeds available memory of {} " + + "due to operator minimum", + alloc, op.getClass().getSimpleName(), preferredOpMemory); + } + else if (alloc < preferredOpMemory) { + logger.warn("Allocated memory of {} for {} is less than available memory " + + "of {} due to operator limit", + alloc, op.getClass().getSimpleName(), preferredOpMemory); + } + op.setMaxAllocation(alloc); + } + } + + protected long queryMemoryPerNode() { + return rm.defaultQueryMemoryPerNode(plan.totalCost()); + } + + /** + * Build a list of external sorts grouped by node. We start with a list of + * minor fragments, each with an endpoint (node). Multiple minor fragments + * may appear on each node, and each minor fragment may have 0, 1 or more + * sorts. + * + * @return + */ + + private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() { + Multimap<String, PhysicalOperator> map = ArrayListMultimap.create(); + getBufferedOps(map, work.getRootFragmentDefn()); + for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) { + getBufferedOps(map, defn); + } + return map.asMap(); + } + + /** + * Searches a fragment operator tree to find buffered within that fragment. + */ + + protected static class BufferedOpFinder extends + AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> { + @Override + public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value) + throws RuntimeException { + if (op.isBufferedOperator()) { + value.add(op); + } + visitChildren(op, value); + return null; + } + } + + private void getBufferedOps(Multimap<String, PhysicalOperator> map, + MinorFragmentDefn defn) { + List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root()); + if (!bufferedOps.isEmpty()) { + map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps); + } + } + + /** + * Search an individual fragment tree to find any buffered operators it may + * contain. + * + * @param root + * @return + */ + + private List<PhysicalOperator> getBufferedOps(FragmentRoot root) { + List<PhysicalOperator> bufferedOps = new ArrayList<>(); + BufferedOpFinder finder = new BufferedOpFinder(); + root.accept(finder, bufferedOps); + return bufferedOps; + } + } + + /** + * Per-query resource manager. Handles resources and optional queue lease for + * a single query. As such, this is a non-shared resource: it is associated + * with a Foreman: a single thread at plan time, and a single event (in some + * thread) at query completion time. Because of these semantics, no + * synchronization is needed within this class. + */ + + public static class QueuedQueryResourceManager extends QueuedResourceAllocator + implements QueryResourceManager { + + private final Foreman foreman; + private QueueLease lease; + + public QueuedQueryResourceManager(final ThrottledResourceManager rm, + final Foreman foreman) { + super(rm, foreman.getQueryContext()); + this.foreman = foreman; + } + + @Override + public void setCost(double cost) { + this.queryCost = cost; + } + + @Override + public void admit() throws QueueTimeoutException, QueryQueueException { + lease = rm.queue().enqueue(foreman.getQueryId(), queryCost); + } + + @Override + protected long queryMemoryPerNode() { + + // No lease: use static estimate. + + if (lease == null) { + return super.queryMemoryPerNode(); + } + + // Use actual memory assigned to this query. + + return lease.queryMemoryPerNode(); + } + + @Override + public void exit() { + if (lease != null) { + lease.release(); + } + lease = null; + } + + @Override + public boolean hasQueue() { return true; } + + @Override + public String queueName() { + return lease == null ? null : lease.queueName(); + } + } + + private final QueryQueue queue; + + public ThrottledResourceManager(final DrillbitContext drillbitContext, + final QueryQueue queue) { + super(drillbitContext); + this.queue = queue; + queue.setMemoryPerNode(memoryPerNode()); + } + + public long minimumOperatorMemory() { + return queue.minimumOperatorMemory(); + } + + public long defaultQueryMemoryPerNode(double cost) { + return queue.defaultQueryMemoryPerNode(cost); + } + + public QueryQueue queue() { return queue; } + + @Override + public QueryResourceAllocator newResourceAllocator( + QueryContext queryContext) { + return new QueuedResourceAllocator(this, queryContext); + } + + @Override + public QueryResourceManager newQueryRM(Foreman foreman) { + return new QueuedQueryResourceManager(this, foreman); + } + + @Override + public void close() { + queue.close(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java new file mode 100644 index 0000000..0b9e9da --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides resource management and queuing support for the Drill foreman. + * The resource manager tracks total resources available to Drill. Several + * implementations are available: a default implementation for systems without + * queueing and an access-controlled (AC) version for systems with queues. + * <p> + * Each resource manager provides a per-query manager that is responsible + * for queuing the query (if needed) and memory allocation to the query based + * on query characteristics and memory assigned to the query. + * <p> + * Provides two different queue implementations. A distributed ZooKeeper queue + * and a local queue useful for embedded Drillbits (and for testing.) + */ +package org.apache.drill.exec.work.foreman.rm;