http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d01d8fd..a1f9d9f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,17 +17,15 @@
  */
 package org.apache.drill.exec.work.foreman;
 
-import com.codahale.metrics.Counter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.drill.common.CatastrophicFailure;
 import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -36,9 +34,6 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.coord.ClusterCoordinator;
-import org.apache.drill.exec.coord.DistributedSemaphore;
-import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.metrics.DrillMetrics;
@@ -73,43 +68,51 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.util.MemoryAllocationUtilities;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
 import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query 
where this
  * is the driving/root node.
  *
  * The flow is as follows:
- * - Foreman is submitted as a runnable.
- * - Runnable does query planning.
- * - state changes from PENDING to RUNNING
- * - Runnable sends out starting fragments
- * - Status listener are activated
- * - The Runnable's run() completes, but the Foreman stays around
- * - Foreman listens for state change messages.
- * - state change messages can drive the state to FAILED or CANCELED, in which 
case
- *   messages are sent to running fragments to terminate
- * - when all fragments complete, state change messages drive the state to 
COMPLETED
+ * <ul>
+ * <li>Foreman is submitted as a runnable.</li>
+ * <li>Runnable does query planning.</li>
+ * <li>state changes from PENDING to RUNNING</li>
+ * <li>Runnable sends out starting fragments</li>
+ * <li>Status listener are activated</li>
+ * <li>The Runnable's run() completes, but the Foreman stays around</li>
+ * <li>Foreman listens for state change messages.</li>
+ * <li>state change messages can drive the state to FAILED or CANCELED, in 
which case
+ *   messages are sent to running fragments to terminate</li>
+ * <li>when all fragments complete, state change messages drive the state to 
COMPLETED</li>
+ * </ul>
  */
+
 public class Foreman implements Runnable {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Foreman.class);
   private static final org.slf4j.Logger queryLogger = 
org.slf4j.LoggerFactory.getLogger("query.logger");
@@ -136,15 +139,13 @@ public class Foreman implements Runnable {
   private boolean resume = false;
   private final ProfileOption profileOption;
 
-  private volatile DistributedLease lease; // used to limit the number of 
concurrent queries
+  private final QueryResourceManager queryRM;
 
   private final ResponseSendListener responseListener = new 
ResponseSendListener();
   private final StateSwitch stateSwitch = new StateSwitch();
   private final ForemanResult foremanResult = new ForemanResult();
   private final ConnectionClosedListener closeListener = new 
ConnectionClosedListener();
   private final ChannelFuture closeFuture;
-  private final boolean queuingEnabled;
-
 
   private String queryText;
 
@@ -173,12 +174,9 @@ public class Foreman implements Runnable {
     queryManager = new QueryManager(queryId, queryRequest, 
drillbitContext.getStoreProvider(),
         drillbitContext.getClusterCoordinator(), this);
 
-    final OptionManager optionManager = queryContext.getOptions();
-    queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
-
-    final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : 
QueryState.STARTING;
-    recordNewState(initialState);
+    recordNewState(QueryState.ENQUEUED);
     enqueuedQueries.inc();
+    queryRM = drillbitContext.getResourceManager().newQueryRM(this);
 
     profileOption = setProfileOption(queryContext.getOptions());
   }
@@ -350,20 +348,6 @@ public class Foreman implements Runnable {
      */
   }
 
-  private void releaseLease() {
-    while (lease != null) {
-      try {
-        lease.close();
-        lease = null;
-      } catch (final InterruptedException e) {
-        // if we end up here, the while loop will try again
-      } catch (final Exception e) {
-        logger.warn("Failure while releasing lease.", e);
-        break;
-      }
-    }
-  }
-
   private void parseAndRunLogicalPlan(final String json) throws 
ExecutionSetupException {
     LogicalPlan logicalPlan;
     try {
@@ -431,18 +415,17 @@ public class Foreman implements Runnable {
 
   private void runPhysicalPlan(final PhysicalPlan plan) throws 
ExecutionSetupException {
     validatePlan(plan);
-    MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, 
queryContext);
-    //Marking endTime of Planning
-    queryManager.markPlanningEndTime();
-
-    if (queuingEnabled) {
-      acquireQuerySemaphore(plan);
-      moveToState(QueryState.STARTING, null);
-      //Marking endTime of Waiting in Queue
-      queryManager.markQueueWaitEndTime();
-    }
 
+    queryRM.visitAbstractPlan(plan);
     final QueryWorkUnit work = getQueryWorkUnit(plan);
+    queryRM.visitPhysicalPlan(work);
+    queryRM.setCost(plan.totalCost());
+    queryManager.setTotalCost(plan.totalCost());
+    work.applyPlan(drillbitContext.getPlanReader());
+    logWorkUnit(work);
+    admit(work);
+    queryManager.setQueueName(queryRM.queueName());
+
     final List<PlanFragment> planFragments = work.getFragments();
     final PlanFragment rootPlanFragment = work.getRootFragment();
     assert queryId == rootPlanFragment.getHandle().getQueryId();
@@ -461,6 +444,22 @@ public class Foreman implements Runnable {
     logger.debug("Fragments running.");
   }
 
+  private void admit(QueryWorkUnit work) throws ForemanSetupException {
+    queryManager.markPlanningEndTime();
+    try {
+      queryRM.admit();
+    } catch (QueueTimeoutException e) {
+      throw UserException
+          .resourceError()
+          .message(e.getMessage())
+          .build(logger);
+    } catch (QueryQueueException e) {
+      throw new ForemanSetupException(e.getMessage(), e);
+    }
+    moveToState(QueryState.STARTING, null);
+    queryManager.markQueueWaitEndTime();
+  }
+
   /**
    * This is a helper method to run query based on the list of PlanFragment 
that were planned
    * at some point of time
@@ -495,10 +494,8 @@ public class Foreman implements Runnable {
     } catch (IOException e) {
       throw new ExecutionSetupException(String.format("Unable to parse 
FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
     }
-    if (queuingEnabled) {
-      acquireQuerySemaphore(rootOperator.getCost());
-      moveToState(QueryState.STARTING, null);
-    }
+    queryRM.setCost(rootOperator.getCost());
+    admit(null);
     drillbitContext.getWorkBus().addFragmentStatusListener(queryId, 
queryManager.getFragmentStatusListener());
     
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
@@ -548,62 +545,6 @@ public class Foreman implements Runnable {
     }
   }
 
-  /**
-   * This limits the number of "small" and "large" queries that a Drill 
cluster will run
-   * simultaneously, if queueing is enabled. If the query is unable to run, 
this will block
-   * until it can. Beware that this is called under run(), and so will consume 
a Thread
-   * while it waits for the required distributed semaphore.
-   *
-   * @param plan the query plan
-   * @throws ForemanSetupException
-   */
-  private void acquireQuerySemaphore(final PhysicalPlan plan) throws 
ForemanSetupException {
-    double totalCost = 0;
-    for (final PhysicalOperator ops : plan.getSortedOperators()) {
-      totalCost += ops.getCost();
-    }
-
-    acquireQuerySemaphore(totalCost);
-    return;
-  }
-
-  private void acquireQuerySemaphore(double totalCost) throws 
ForemanSetupException {
-    final OptionManager optionManager = queryContext.getOptions();
-    final long queueThreshold = 
optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
-
-    final long queueTimeout = 
optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
-    final String queueName;
-
-    try {
-      final ClusterCoordinator clusterCoordinator = 
drillbitContext.getClusterCoordinator();
-      final DistributedSemaphore distributedSemaphore;
-
-      // get the appropriate semaphore
-      if (totalCost > queueThreshold) {
-        final int largeQueue = (int) 
optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
-        distributedSemaphore = clusterCoordinator.getSemaphore("query.large", 
largeQueue);
-        queueName = "large";
-      } else {
-        final int smallQueue = (int) 
optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
-        distributedSemaphore = clusterCoordinator.getSemaphore("query.small", 
smallQueue);
-        queueName = "small";
-      }
-
-      lease = distributedSemaphore.acquire(queueTimeout, 
TimeUnit.MILLISECONDS);
-    } catch (final Exception e) {
-      throw new ForemanSetupException("Unable to acquire slot for query.", e);
-    }
-
-    if (lease == null) {
-      throw UserException
-          .resourceError()
-          .message(
-              "Unable to acquire queue resources for query within timeout.  
Timeout for %s queue was set at %d seconds.",
-              queueName, queueTimeout / 1000)
-          .build(logger);
-    }
-  }
-
   Exception getCurrentException() {
     return foremanResult.getException();
   }
@@ -612,54 +553,55 @@ public class Foreman implements Runnable {
     final PhysicalOperator rootOperator = 
plan.getSortedOperators(false).iterator().next();
     final Fragment rootFragment = 
rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
     final SimpleParallelizer parallelizer = new 
SimpleParallelizer(queryContext);
-    final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(
+    return parallelizer.getFragments(
         queryContext.getOptions().getOptionList(), 
queryContext.getCurrentEndpoint(),
-        queryId, queryContext.getActiveEndpoints(), 
drillbitContext.getPlanReader(), rootFragment,
+        queryId, queryContext.getActiveEndpoints(), rootFragment,
         initiatingClient.getSession(), queryContext.getQueryContextInfo());
+  }
 
-    if (logger.isTraceEnabled()) {
-      final StringBuilder sb = new StringBuilder();
-      sb.append("PlanFragments for query ");
-      sb.append(queryId);
+  private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
+    if (! logger.isTraceEnabled()) {
+      return;
+    }
+    final StringBuilder sb = new StringBuilder();
+    sb.append("PlanFragments for query ");
+    sb.append(queryId);
+    sb.append('\n');
+
+    final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
+    final int fragmentCount = planFragments.size();
+    int fragmentIndex = 0;
+    for(final PlanFragment planFragment : planFragments) {
+      final FragmentHandle fragmentHandle = planFragment.getHandle();
+      sb.append("PlanFragment(");
+      sb.append(++fragmentIndex);
+      sb.append('/');
+      sb.append(fragmentCount);
+      sb.append(") major_fragment_id ");
+      sb.append(fragmentHandle.getMajorFragmentId());
+      sb.append(" minor_fragment_id ");
+      sb.append(fragmentHandle.getMinorFragmentId());
       sb.append('\n');
 
-      final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
-      final int fragmentCount = planFragments.size();
-      int fragmentIndex = 0;
-      for(final PlanFragment planFragment : planFragments) {
-        final FragmentHandle fragmentHandle = planFragment.getHandle();
-        sb.append("PlanFragment(");
-        sb.append(++fragmentIndex);
-        sb.append('/');
-        sb.append(fragmentCount);
-        sb.append(") major_fragment_id ");
-        sb.append(fragmentHandle.getMajorFragmentId());
-        sb.append(" minor_fragment_id ");
-        sb.append(fragmentHandle.getMinorFragmentId());
-        sb.append('\n');
-
-        final DrillbitEndpoint endpointAssignment = 
planFragment.getAssignment();
-        sb.append("  DrillbitEndpoint address ");
-        sb.append(endpointAssignment.getAddress());
-        sb.append('\n');
-
-        String jsonString = "<<malformed JSON>>";
-        sb.append("  fragment_json: ");
-        final ObjectMapper objectMapper = new ObjectMapper();
-        try
-        {
-          final Object json = 
objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
-          jsonString = 
objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
-        } catch(final Exception e) {
-          // we've already set jsonString to a fallback value
-        }
-        sb.append(jsonString);
+      final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+      sb.append("  DrillbitEndpoint address ");
+      sb.append(endpointAssignment.getAddress());
+      sb.append('\n');
 
-        logger.trace(sb.toString());
+      String jsonString = "<<malformed JSON>>";
+      sb.append("  fragment_json: ");
+      final ObjectMapper objectMapper = new ObjectMapper();
+      try
+      {
+        final Object json = 
objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+        jsonString = 
objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
+      } catch(final Exception e) {
+        // we've already set jsonString to a fallback value
       }
-    }
+      sb.append(jsonString);
 
-    return queryWorkUnit;
+      logger.trace(sb.toString());
+    }
   }
 
   /**
@@ -897,7 +839,7 @@ public class Foreman implements Runnable {
       runningQueries.dec();
       completedQueries.inc();
       try {
-        releaseLease();
+        queryRM.exit();
       } finally {
         isClosed = true;
       }
@@ -953,7 +895,7 @@ public class Foreman implements Runnable {
         foremanResult.setCompleted(QueryState.CANCELED);
         /*
          * We don't close the foremanResult until we've gotten
-         * acknowledgements, which happens below in the case for current state
+         * acknowledgments, which happens below in the case for current state
          * == CANCELLATION_REQUESTED.
          */
         return;

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index ecbccf3..216a80d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.SchemaUserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -99,6 +98,15 @@ public class QueryManager implements AutoCloseable {
   // Is the query saved in transient store
   private boolean inTransientStore;
 
+  /**
+   * Total query cost. This value is used to place the query into a queue
+   * and so has meaning to the user who wants to predict queue placement.
+   */
+
+  private double totalCost;
+
+  private String queueName;
+
   public QueryManager(final QueryId queryId, final RunQuery runQuery, final 
PersistentStoreProvider storeProvider,
       final ClusterCoordinator coordinator, final Foreman foreman) {
     this.queryId =  queryId;
@@ -191,6 +199,7 @@ public class QueryManager implements AutoCloseable {
    * (3) Leaf fragment: running, send the cancel signal through a tunnel. The 
cancel is done directly.
    */
   void cancelExecutingFragments(final DrillbitContext drillbitContext) {
+    @SuppressWarnings("resource")
     final Controller controller = drillbitContext.getController();
     for(final FragmentData data : fragmentDataSet) {
       switch(data.getState()) {
@@ -219,6 +228,7 @@ public class QueryManager implements AutoCloseable {
    * sending any message. Resume all fragments through the control tunnel.
    */
   void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
+    @SuppressWarnings("resource")
     final Controller controller = drillbitContext.getController();
     for(final FragmentData data : fragmentDataSet) {
       final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -318,6 +328,8 @@ public class QueryManager implements AutoCloseable {
         .setUser(foreman.getQueryContext().getQueryUserName())
         .setForeman(foreman.getQueryContext().getCurrentEndpoint())
         .setStart(startTime)
+        .setTotalCost(totalCost)
+        .setQueueName(queueName == null ? "-" : queueName)
         .setOptionsJson(getQueryOptionsAsJson());
 
     if (queryText != null) {
@@ -332,7 +344,6 @@ public class QueryManager implements AutoCloseable {
   }
 
   private QueryProfile getQueryProfile(UserException ex) {
-    final String queryText = foreman.getQueryText();
     final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
         .setUser(foreman.getQueryContext().getQueryUserName())
         .setType(runQuery.getType())
@@ -345,6 +356,8 @@ public class QueryManager implements AutoCloseable {
         .setQueueWaitEnd(queueWaitEndTime)
         .setTotalFragments(fragmentDataSet.size())
         .setFinishedFragments(finishedFragments.get())
+        .setTotalCost(totalCost)
+        .setQueueName(queueName == null ? "-" : queueName)
         .setOptionsJson(getQueryOptionsAsJson());
 
     if (ex != null) {
@@ -360,6 +373,7 @@ public class QueryManager implements AutoCloseable {
       profileBuilder.setPlan(planText);
     }
 
+    final String queryText = foreman.getQueryText();
     if (queryText != null) {
       profileBuilder.setQuery(queryText);
     }
@@ -392,7 +406,6 @@ public class QueryManager implements AutoCloseable {
       profileBuilder.addFragmentProfile(builder);
       return true;
     }
-
   }
 
   private class InnerIter implements IntObjectPredicate<FragmentData> {
@@ -407,7 +420,6 @@ public class QueryManager implements AutoCloseable {
       builder.addMinorFragmentProfile(data.getProfile());
       return true;
     }
-
   }
 
   void setPlanText(final String planText) {
@@ -430,6 +442,14 @@ public class QueryManager implements AutoCloseable {
     queueWaitEndTime = System.currentTimeMillis();
   }
 
+  public void setTotalCost(double totalCost) {
+    this.totalCost = totalCost;
+  }
+
+  public void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
   /**
    * Internal class used to track the number of pending completion messages 
required from particular node. This allows
    * to know for each node that is part of this query, what portion of 
fragments are still outstanding. In the case that
@@ -536,7 +556,7 @@ public class QueryManager implements AutoCloseable {
     return drillbitStatusListener;
   }
 
-  private final DrillbitStatusListener drillbitStatusListener = new 
DrillbitStatusListener(){
+  private final DrillbitStatusListener drillbitStatusListener = new 
DrillbitStatusListener() {
 
     @Override
     public void drillbitRegistered(final Set<DrillbitEndpoint> 
registeredDrillbits) {

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
new file mode 100644
index 0000000..9bcadda
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Abstract base class for a resource manager. Handles tasks common to all
+ * resource managers: learning the resources available on this Drillbit.
+ * In the current version, Drillbits must be symmetrical, so that knowing
+ * the resources on one node is sufficient to know resources available on
+ * all nodes.
+ */
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+  protected final DrillbitContext context;
+  private final long memoryPerNode;
+  private final int cpusPerNode;
+
+  public AbstractResourceManager(final DrillbitContext context) {
+    this.context = context;
+    DrillConfig config = context.getConfig();
+
+    // Normally we use the actual direct memory configured on the JVM command
+    // line. However, if the config param is set, we use that instead (if it is
+    // lower than actual memory). Primarily for testing.
+
+    long memLimit = DrillConfig.getMaxDirectMemory();
+    long configMemoryPerNode = 
config.getBytes(ExecConstants.MAX_MEMORY_PER_NODE);
+    if (configMemoryPerNode > 0) {
+      memLimit = Math.min(memLimit, configMemoryPerNode);
+    }
+    memoryPerNode = memLimit;
+
+    // Do the same for CPUs.
+
+    int cpuLimit = Runtime.getRuntime().availableProcessors();
+    int configCpusPerNode = config.getInt(ExecConstants.MAX_CPUS_PER_NODE);
+    if (configCpusPerNode > 0) {
+      cpuLimit = Math.min(cpuLimit, configCpusPerNode);
+    }
+    cpusPerNode = cpuLimit;
+  }
+
+  @Override
+  public long memoryPerNode() { return memoryPerNode; }
+
+  @Override
+  public int cpusPerNode() { return cpusPerNode; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
new file mode 100644
index 0000000..c28fbbb
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+/**
+ * Represents a default resource manager for clusters that do not provide query
+ * queues. Without queues to provide a hard limit on the query admission rate,
+ * the number of active queries must be estimated and the resulting resource
+ * allocations will be rough estimates.
+ */
+
+public class DefaultResourceManager implements ResourceManager {
+
+  public static class DefaultResourceAllocator implements 
QueryResourceAllocator {
+
+    private QueryContext queryContext;
+
+    protected DefaultResourceAllocator(QueryContext queryContext) {
+      this.queryContext = queryContext;
+    }
+
+    @Override
+    public void visitAbstractPlan(PhysicalPlan plan) {
+      if (plan == null || plan.getProperties().hasResourcePlan) {
+        return;
+      }
+      MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, 
queryContext);
+    }
+
+    @Override
+    public void visitPhysicalPlan(QueryWorkUnit work) {
+    }
+  }
+
+  public static class DefaultQueryResourceManager extends 
DefaultResourceAllocator implements QueryResourceManager {
+
+    @SuppressWarnings("unused")
+    private final DefaultResourceManager rm;
+
+    public DefaultQueryResourceManager(final DefaultResourceManager rm, final 
Foreman foreman) {
+      super(foreman.getQueryContext());
+      this.rm = rm;
+    }
+
+    @Override
+    public void setCost(double cost) {
+      // Nothing to do by default.
+    }
+
+    @Override
+    public void admit() {
+      // No queueing by default
+    }
+
+    @Override
+    public void exit() {
+      // No queueing by default
+    }
+
+    @Override
+    public boolean hasQueue() { return false; }
+
+    @Override
+    public String queueName() { return null; }
+  }
+
+  public final long memoryPerNode;
+  public final int cpusPerNode;
+
+  public DefaultResourceManager() {
+    memoryPerNode = DrillConfig.getMaxDirectMemory();
+
+    // Note: CPUs are not yet used, they will be used in a future
+    // enhancement.
+
+    cpusPerNode = Runtime.getRuntime().availableProcessors();
+  }
+
+  @Override
+  public long memoryPerNode() { return memoryPerNode; }
+
+  @Override
+  public int cpusPerNode() { return cpusPerNode; }
+
+  @Override
+  public QueryResourceAllocator newResourceAllocator(QueryContext 
queryContext) {
+    return new DefaultResourceAllocator(queryContext);
+  }
+
+  @Override
+  public QueryResourceManager newQueryRM(final Foreman foreman) {
+    return new DefaultQueryResourceManager(this, foreman);
+  }
+
+  @Override
+  public void close() { }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
new file mode 100644
index 0000000..9a4c78d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+
+/**
+ * Distributed query queue which uses a Zookeeper distributed semaphore to
+ * control queuing across the cluster. The distributed queue is actually two
+ * queues: one for "small" queries, another for "large" queries. Query size is
+ * determined by the Planner's estimate of query cost.
+ * <p>
+ * This queue is configured using system options:
+ * <dl>
+ * <dt><tt>exec.queue.enable</tt>
+ * <dt>
+ * <dd>Set to true to enable the distributed queue.</dd>
+ * <dt><tt>exec.queue.large</tt>
+ * <dt>
+ * <dd>The maximum number of large queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.small</tt>
+ * <dt>
+ * <dd>The maximum number of small queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.threshold</tt>
+ * <dt>
+ * <dd>The cost threshold. Queries below this size are small, at
+ * or above this size are large..</dd>
+ * <dt><tt>exec.queue.timeout_millis</tt>
+ * <dt>
+ * <dd>The maximum time (in milliseconds) a query will wait in the
+ * queue before failing.</dd>
+ * </dl>
+ * <p>
+ * The above values are refreshed every five seconds. This aids performance
+ * a bit in systems with very high query arrival rates.
+ */
+
+public class DistributedQueryQueue implements QueryQueue {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
+
+  private class DistributedQueueLease implements QueueLease {
+    private final QueryId queryId;
+    private DistributedLease lease;
+    private final String queueName;
+
+    /**
+     * Memory allocated to the query. Though all queries in the queue use
+     * the same memory allocation rules, those rules can change at any time
+     * as the user changes system options. This value captures the value
+     * calculated at the time that this lease was granted.
+     */
+    private long queryMemory;
+
+    public DistributedQueueLease(QueryId queryId, String queueName,
+                    DistributedLease lease, long queryMemory) {
+      this.queryId = queryId;
+      this.queueName = queueName;
+      this.lease = lease;
+      this.queryMemory = queryMemory;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Lease for %s queue to query %s",
+          queueName, QueryIdHelper.getQueryId(queryId));
+    }
+
+    @Override
+    public long queryMemoryPerNode() { return queryMemory; }
+
+    @Override
+    public void release() {
+      DistributedQueryQueue.this.release(this);
+    }
+
+    @Override
+    public String queueName() { return queueName; }
+  }
+
+  /**
+   * Exposes a snapshot of internal state information for use in status
+   * reporting, such as in the UI.
+   */
+
+  @XmlRootElement
+  public static class ZKQueueInfo {
+    public final int smallQueueSize;
+    public final int largeQueueSize;
+    public final double queueThreshold;
+    public final long memoryPerNode;
+    public final long memoryPerSmallQuery;
+    public final long memoryPerLargeQuery;
+
+    public ZKQueueInfo(DistributedQueryQueue queue) {
+      smallQueueSize = queue.configSet.smallQueueSize;
+      largeQueueSize = queue.configSet.largeQueueSize;
+      queueThreshold = queue.configSet.queueThreshold;
+      memoryPerNode = queue.memoryPerNode;
+      memoryPerSmallQuery = queue.memoryPerSmallQuery;
+      memoryPerLargeQuery = queue.memoryPerLargeQuery;
+    }
+  }
+
+  public interface StatusAdapter {
+    boolean inShutDown();
+  }
+
+  /**
+   * Holds runtime configuration options. Allows polling the options
+   * for changes, and easily detecting changes.
+   */
+
+  private static class ConfigSet {
+    private final long queueThreshold;
+    private final long queueTimeout;
+    private final int largeQueueSize;
+    private final int smallQueueSize;
+    private final double largeToSmallRatio;
+    private final double reserveMemoryRatio;
+    private final long minimumOperatorMemory;
+
+    public ConfigSet(SystemOptionManager optionManager) {
+      queueThreshold = 
optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
+      queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+
+      // Option manager supports only long values, but we do not expect
+      // more than 2 billion active queries, so queue size is stored as
+      // an int.
+      // TODO: Once DRILL-5832 is available, add an getInt() method to
+      // the option system to get the value as an int and do a range
+      // check.
+
+      largeQueueSize = (int) 
optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
+      smallQueueSize = (int) 
optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
+      largeToSmallRatio = 
optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
+      reserveMemoryRatio = 
optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
+      minimumOperatorMemory = 
optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+    }
+
+    /**
+     * Determine if this config set is the same as another one. Detects
+     * whether the configuration has changed between one sync point and
+     * another.
+     * <p>
+     * Note that we cannot use <tt>equals()</tt> here as, according to
+     * Drill practice, <tt>equals()</tt> is for use in collections and
+     * must be accompanied by a <tt>hashCode()</tt> method. Since this
+     * class will never be used in a collection, and does not need a
+     * hash function, we cannot use <tt>equals()</tt>.
+     *
+     * @param otherSet another snapshot taken at another time
+     * @return true if this configuration is the same as another
+     * (no update between the two snapshots), false if the config has
+     * changed between the snapshots
+     */
+
+    public boolean isSameAs(ConfigSet otherSet) {
+      return queueThreshold == otherSet.queueThreshold &&
+             queueTimeout == otherSet.queueTimeout &&
+             largeQueueSize == otherSet.largeQueueSize &&
+             smallQueueSize == otherSet.smallQueueSize &&
+             largeToSmallRatio == otherSet.largeToSmallRatio &&
+             reserveMemoryRatio == otherSet.reserveMemoryRatio &&
+             minimumOperatorMemory == otherSet.minimumOperatorMemory;
+    }
+  }
+
+  private long memoryPerNode;
+  private SystemOptionManager optionManager;
+  private ConfigSet configSet;
+  private ClusterCoordinator clusterCoordinator;
+  private long nextRefreshTime;
+  private long memoryPerSmallQuery;
+  private long memoryPerLargeQuery;
+  private final StatusAdapter statusAdapter;
+
+  public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) 
{
+    statusAdapter = adapter;
+    optionManager = context.getOptionManager();
+    clusterCoordinator = context.getClusterCoordinator();
+  }
+
+  @Override
+  public void setMemoryPerNode(long memoryPerNode) {
+    this.memoryPerNode = memoryPerNode;
+    refreshConfig();
+  }
+
+  @Override
+  public long defaultQueryMemoryPerNode(double cost) {
+    return (cost < configSet.queueThreshold)
+        ? memoryPerSmallQuery
+        : memoryPerLargeQuery;
+  }
+
+  @Override
+  public long minimumOperatorMemory() { return 
configSet.minimumOperatorMemory; }
+
+  /**
+   * This limits the number of "small" and "large" queries that a Drill 
cluster will run
+   * simultaneously, if queuing is enabled. If the query is unable to run, 
this will block
+   * until it can. Beware that this is called under run(), and so will consume 
a thread
+   * while it waits for the required distributed semaphore.
+   *
+   * @param queryId query identifier
+   * @param cost the query plan
+   * @throws QueryQueueException if the underlying ZK queuing mechanism fails
+   * @throws QueueTimeoutException if the query waits too long in the
+   * queue
+   */
+
+  @SuppressWarnings("resource")
+  @Override
+  public QueueLease enqueue(QueryId queryId, double cost) throws 
QueryQueueException, QueueTimeoutException {
+    final String queueName;
+    DistributedLease lease = null;
+    long queryMemory;
+    final DistributedSemaphore distributedSemaphore;
+    try {
+
+      // Only the refresh and queue computation is synchronized.
+
+      synchronized(this) {
+        refreshConfig();
+
+        // get the appropriate semaphore
+        if (cost >= configSet.queueThreshold) {
+          distributedSemaphore = 
clusterCoordinator.getSemaphore("query.large", configSet.largeQueueSize);
+          queueName = "large";
+          queryMemory = memoryPerLargeQuery;
+        } else {
+          distributedSemaphore = 
clusterCoordinator.getSemaphore("query.small", configSet.smallQueueSize);
+          queueName = "small";
+          queryMemory = memoryPerSmallQuery;
+        }
+      }
+      logger.debug("Query {} with cost {} placed into the {} queue.",
+                   QueryIdHelper.getQueryId(queryId), cost, queueName);
+
+      lease = distributedSemaphore.acquire(configSet.queueTimeout, 
TimeUnit.MILLISECONDS);
+    } catch (final Exception e) {
+      logger.error("Unable to acquire slot for query " +
+                   QueryIdHelper.getQueryId(queryId), e);
+      throw new QueryQueueException("Unable to acquire slot for query.", e);
+    }
+
+    if (lease == null) {
+      int timeoutSecs = (int) Math.round(configSet.queueTimeout/1000.0);
+      logger.warn("Queue timeout: {} after {} seconds.", queueName, 
timeoutSecs);
+      throw new QueueTimeoutException(queryId, queueName, timeoutSecs);
+    }
+    return new DistributedQueueLease(queryId, queueName, lease, queryMemory);
+  }
+
+  private synchronized void refreshConfig() {
+    long now = System.currentTimeMillis();
+    if (now < nextRefreshTime) {
+      return;
+    }
+    nextRefreshTime = now + 5000;
+
+    // Only update numbers, and log changes, if something
+    // actually changes.
+
+    ConfigSet newSet = new ConfigSet(optionManager);
+    if (newSet.isSameAs(configSet)) {
+      return;
+    }
+
+    configSet = newSet;
+
+    // Divide up memory between queues using admission rate
+    // to give more memory to larger queries and less to
+    // smaller queries. We assume that large queries are
+    // larger than small queries by a factor of
+    // largeToSmallRatio.
+
+    double totalUnits = configSet.largeToSmallRatio * configSet.largeQueueSize 
+ configSet.smallQueueSize;
+    double availableMemory = Math.round(memoryPerNode * (1.0 - 
configSet.reserveMemoryRatio));
+    double memoryUnit = availableMemory / totalUnits;
+    memoryPerLargeQuery = Math.round(memoryUnit * configSet.largeToSmallRatio);
+    memoryPerSmallQuery = Math.round(memoryUnit);
+
+    logger.debug("Memory config: total memory per node = {}, available: {},  
large/small memory ratio = {}",
+        memoryPerNode, availableMemory, configSet.largeToSmallRatio);
+    logger.debug("Reserve memory ratio: {}, bytes: {}",
+        configSet.reserveMemoryRatio, memoryPerNode - availableMemory);
+    logger.debug("Minimum operator memory: {}", 
configSet.minimumOperatorMemory);
+    logger.debug("Small queue: {} slots, {} bytes per slot",
+        configSet.smallQueueSize, memoryPerSmallQuery);
+    logger.debug("Large queue: {} slots, {} bytes per slot",
+        configSet.largeQueueSize, memoryPerLargeQuery);
+    logger.debug("Cost threshold: {}, timeout: {} ms.",
+        configSet.queueThreshold, configSet.queueTimeout);
+  }
+
+  @Override
+  public boolean enabled() { return true; }
+
+  public synchronized ZKQueueInfo getInfo() {
+    refreshConfig();
+    return new ZKQueueInfo(this);
+  }
+
+  private void release(QueueLease lease) {
+    DistributedQueueLease theLease = (DistributedQueueLease) lease;
+    for (;;) {
+      try {
+        theLease.lease.close();
+        theLease.lease = null;
+        break;
+      } catch (final InterruptedException e) {
+        // if we end up here, the loop will try again
+      } catch (final Exception e) {
+        logger.warn("Failure while releasing lease.", e);
+        break;
+      }
+      if (inShutdown()) {
+        logger.warn("In shutdown mode: abandoning attempt to release lease");
+      }
+    }
+  }
+
+  private boolean inShutdown() {
+    if (statusAdapter == null) {
+      return false;
+    }
+    return statusAdapter.inShutDown();
+  }
+
+  @Override
+  public void close() {
+    // Nothing to do.
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
new file mode 100644
index 0000000..473401f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import 
org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter;
+
+/**
+ * Wrapper around the default and/or distributed resource managers
+ * to allow dynamically enabling and disabling queueing.
+ */
+
+public class DynamicResourceManager implements ResourceManager {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class);
+
+  private final DrillbitContext context;
+  private ResourceManager defaultRm;
+  private ResourceManager queueingRm;
+  private ResourceManager activeRm;
+  public long nextUpdateTime;
+  public final int recheckDelayMs = 5000;
+
+  public DynamicResourceManager(final DrillbitContext context) {
+    this.context = context;
+    refreshRM();
+  }
+
+  public synchronized ResourceManager activeRM() {
+    refreshRM();
+    return activeRm;
+  }
+
+  @Override
+  public long memoryPerNode() {
+    return activeRm.memoryPerNode();
+  }
+
+  @Override
+  public int cpusPerNode() {
+    return activeRm.cpusPerNode();
+  }
+
+  @Override
+  public synchronized QueryResourceAllocator newResourceAllocator(QueryContext 
queryContext) {
+    refreshRM();
+    return activeRm.newResourceAllocator(queryContext);
+  }
+
+  @Override
+  public synchronized QueryResourceManager newQueryRM(Foreman foreman) {
+    refreshRM();
+    return activeRm.newQueryRM(foreman);
+  }
+
+  private void refreshRM() {
+    long now = System.currentTimeMillis();
+    if (now < nextUpdateTime) {
+      return;
+    }
+    nextUpdateTime = now + recheckDelayMs;
+    @SuppressWarnings("resource")
+    SystemOptionManager systemOptions = context.getOptionManager();
+    if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) {
+      if (queueingRm == null) {
+        StatusAdapter statusAdapter = new StatusAdapter() {
+          @Override
+          public boolean inShutDown() {
+            // Drill provides no shutdown state at present.
+            // TODO: Once DRILL-4286 (graceful shutdown) is merged, use the
+            // new Drillbit status to determine when the Drillbit
+            // is shutting down.
+            return false;
+          }
+        };
+        queueingRm = new ThrottledResourceManager(context,
+            new DistributedQueryQueue(context, statusAdapter));
+      }
+      if (activeRm != queueingRm) {
+        logger.debug("Enabling ZK-based query queue.");
+        activeRm = queueingRm;
+      }
+    } else {
+      if (defaultRm == null) {
+        defaultRm = new DefaultResourceManager();
+      }
+      if (activeRm != defaultRm) {
+        logger.debug("Disabling ZK-based query queue.");
+        activeRm = defaultRm;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    RuntimeException ex = null;
+    try {
+      if (defaultRm != null) {
+        defaultRm.close();
+      }
+    } catch (RuntimeException e) {
+      ex = e;
+    } finally {
+      defaultRm = null;
+    }
+    try {
+      if (queueingRm != null) {
+        queueingRm.close();
+      }
+    } catch (RuntimeException e) {
+      ex = ex == null ? e : ex;
+    } finally {
+      queueingRm = null;
+    }
+    activeRm = null;
+    if (ex == null) {
+      return;
+    } else if (ex instanceof UserException) {
+      throw (UserException) ex;
+    } else {
+      throw UserException.systemError(ex)
+        .addContext("Failure closing resource managers.")
+        .build(logger);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
new file mode 100644
index 0000000..a042f8b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Query queue to be used in an embedded Drillbit. This queue has scope of only
+ * the one Drillbit (not even multiple Drillbits in the same process.) 
Primarily
+ * intended for testing, but may possibly be useful for other embedded
+ * applications.
+ * <p>
+ * Configuration is via config parameters (not via system options as for the
+ * distributed queue.)
+ * <dl>
+ * <dt><tt>drill.queue.embedded.enabled</tt></dt>
+ * <dd>Set to true to enable the embedded queue. But, this setting has effect
+ * only if the Drillbit is, in fact, embedded.</dd>
+ * <dt><tt>drill.queue.embedded.size</tt></dt>
+ * <dd>The number of active queries, all others queue. There is no upper limit
+ * on the number of queued entries.</dt>
+ * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt>
+ * <dd>The maximum time a query will wait in the queue before failing.</dd>
+ * </dl>
+ */
+
+public class EmbeddedQueryQueue implements QueryQueue {
+
+  public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded";
+  public static String ENABLED = EMBEDDED_QUEUE + ".enable";
+  public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size";
+  public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms";
+
+  public class EmbeddedQueueLease implements QueueLease {
+
+    private final QueryId queryId;
+    private boolean released;
+    private long queryMemory;
+
+    public EmbeddedQueueLease(QueryId queryId, long queryMemory) {
+      this.queryId = queryId;
+      this.queryMemory = queryMemory;
+    }
+
+    @Override
+    public String toString( ) {
+      return new StringBuilder()
+          .append("Embedded queue lease for ")
+          .append(QueryIdHelper.getQueryId(queryId))
+          .append(released ? " (released)" : "")
+          .toString();
+    }
+
+    @Override
+    public long queryMemoryPerNode() {
+      return queryMemory;
+    }
+
+    @Override
+    public void release() {
+      EmbeddedQueryQueue.this.release(this);
+      released = true;
+    }
+
+    @VisibleForTesting
+    boolean isReleased() { return released; }
+
+    @Override
+    public String queueName() { return "local-queue"; }
+  }
+
+  private final int queueTimeoutMs;
+  private final int queueSize;
+  private final Semaphore semaphore;
+  private long memoryPerQuery;
+  private final long minimumOperatorMemory;
+
+  public EmbeddedQueryQueue(DrillbitContext context) {
+    DrillConfig config = context.getConfig();
+    queueTimeoutMs = config.getInt(TIMEOUT_MS);
+    queueSize = config.getInt(QUEUE_SIZE);
+    semaphore = new Semaphore(queueSize, true);
+    minimumOperatorMemory = context.getOptionManager()
+        .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+  }
+
+  @Override
+  public boolean enabled() { return true; }
+
+  @Override
+  public void setMemoryPerNode(long memoryPerNode) {
+    memoryPerQuery = memoryPerNode / queueSize;
+  }
+
+  @Override
+  public long defaultQueryMemoryPerNode(double cost) {
+    return memoryPerQuery;
+  }
+
+  @Override
+  public QueueLease enqueue(QueryId queryId, double cost)
+      throws QueueTimeoutException, QueryQueueException {
+    try {
+      if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) {
+        throw new QueueTimeoutException(queryId, "embedded", queueTimeoutMs);
+      }
+    } catch (InterruptedException e) {
+      throw new QueryQueueException("Interrupted", e);
+    }
+    return new EmbeddedQueueLease(queryId, memoryPerQuery);
+  }
+
+  private void release(EmbeddedQueueLease lease) {
+    assert ! lease.released;
+    semaphore.release();
+  }
+
+  @Override
+  public void close() {
+    assert semaphore.availablePermits() == queueSize;
+  }
+
+  @Override
+  public long minimumOperatorMemory() {
+    return minimumOperatorMemory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
new file mode 100644
index 0000000..72dc2d6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+/**
+ * Interface which defines a queue implementation for query queues.
+ * Implementations can queue locally, queue distributed, or do
+ * nothing at all.
+ * <p>
+ * A queue can report itself as enabled or disabled. When enabled,
+ * all queries must obtain a lease prior to starting execution. The
+ * lease must be released at the completion of execution.
+ */
+
+public interface QueryQueue {
+
+  /**
+   * The opaque lease returned once a query is admitted
+   * for execution.
+   */
+
+  public interface QueueLease {
+    long queryMemoryPerNode();
+
+    /**
+     * Release a query lease obtained from {@link #queue(QueryId, double))}.
+     * Should be called by the per-query resource manager.
+     *
+     * @param lease the lease to be released.
+     */
+
+    void release();
+
+    String queueName();
+  };
+
+  /**
+   * Exception thrown if a query exceeds the configured wait time
+   * in the query queue.
+   */
+
+  @SuppressWarnings("serial")
+  public class QueueTimeoutException extends Exception {
+
+    private final QueryId queryId;
+    private final String queueName;
+    private final int timeoutMs;
+
+    public QueueTimeoutException(QueryId queryId, String queueName, int 
timeoutMs) {
+      super( String.format(
+          "Query timed out of the %s queue after %d ms.",
+          queueName, timeoutMs ));
+      this.queryId = queryId;
+      this.queueName = queueName;
+      this.timeoutMs = timeoutMs;
+    }
+
+    public QueryId queryId() { return queryId; }
+    public String queueName() { return queueName; }
+    public int timeoutMs() { return timeoutMs; }
+  }
+
+  /**
+   * Exception thrown for all non-timeout error conditions.
+   */
+
+  @SuppressWarnings("serial")
+  public class QueryQueueException extends Exception {
+    QueryQueueException(String msg, Exception e) {
+      super(msg, e);
+    }
+  }
+
+  void setMemoryPerNode(long memoryPerNode);
+
+  /**
+   * Return the amount of memory per node when creating a EXPLAIN
+   * query plan. Plans to be executed should get the query memory from
+   * the lease, as the lease may adjust the default amount on a per-query
+   * basis. This means that the memory used to execute the query may
+   * differ from the amount shown in an EXPLAIN plan.
+   *
+   * @return assumed memory per node, in bytes, to use when creating
+   * an EXPLAIN plan
+   */
+
+  long defaultQueryMemoryPerNode(double cost);
+
+  /**
+   * Optional floor on the amount of memory assigned per operator.
+   * This ensures that operators receive a certain amount, separate from
+   * any memory slicing. This can oversubscribe node memory if used
+   * incorrectly.
+   *
+   * @return minimum per-operator memory, in bytes
+   */
+
+  long minimumOperatorMemory();
+
+  /**
+   * Determine if the queue is enabled.
+   * @return true if the query is enabled, false otherwise.
+   */
+
+  boolean enabled();
+
+  /**
+   * Queue a query. The method returns only when the query is admitted for
+   * execution. As a result, the calling thread may block up to the configured
+   * wait time.
+   * @param queryId the query ID
+   * @param cost the cost of the query used for cost-based queueing
+   * @return the query lease which must be passed to {@link 
#release(QueueLease)}
+   * upon query completion
+   * @throws QueueTimeoutException if the query times out waiting to be
+   * admitted.
+   * @throws QueryQueueException for any other error condition.
+   */
+
+  QueueLease enqueue(QueryId queryId, double cost) throws 
QueueTimeoutException, QueryQueueException;
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
new file mode 100644
index 0000000..35dbe59
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+/**
+ * Manages resources for an individual query in conjunction with the
+ * global {@link ResourceManager}. Handles memory and CPU allocation.
+ * Instances of this class handle query planning and are used when the
+ * client wants to plan the query, but not execute it. An implementation
+ * of {@link QueryResourceManager} is used to both plan the query and
+ * queue it for execution.
+ * <p>
+ * This interface allows a variety of resource management strategies to
+ * exist for different purposes.
+ * <p>
+ * The methods here assume external synchronization: a single query calls
+ * the methods at known times; there are no concurrent calls.
+ */
+
+public interface QueryResourceAllocator {
+
+  /**
+   * Make any needed adjustments to the query plan before parallelization.
+   *
+   * @param plan
+   */
+  void visitAbstractPlan(PhysicalPlan plan);
+
+  /**
+   * Provide the manager with the physical plan and node assignments
+   * for the query to be run. This class will plan memory for the query.
+   *
+   * @param plan
+   * @param work
+   */
+
+  void visitPhysicalPlan(QueryWorkUnit work);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
new file mode 100644
index 0000000..9e2a3a1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+/**
+ * Extends a {@link QueryResourceAllocator} to provide queueing support.
+ */
+
+public interface QueryResourceManager extends QueryResourceAllocator {
+
+  /**
+   * Hint that this resource manager queues. Allows the Foreman
+   * to short-circuit expensive logic if no queuing will actually
+   * be done. This is a static attribute per Drillbit run.
+   */
+
+  boolean hasQueue();
+
+  /**
+   * For some cases the foreman does not have a full plan, just a cost. In
+   * this case, this object will not plan memory, but still needs the cost
+   * to place the job into the correct queue.
+   * @param cost
+   */
+
+  void setCost(double cost);
+
+  /**
+   * Admit the query into the cluster. Blocks until the query
+   * can run. (Later revisions may use a more thread-friendly
+   * approach.)
+   * @throws QueryQueueException if something goes wrong with the
+   * queue mechanism
+   * @throws QueueTimeoutException if the query timed out waiting to
+   * be admitted.
+   */
+
+  void admit() throws QueueTimeoutException, QueryQueueException;
+
+  /**
+   * Returns the name of the queue (if any) on which the query was
+   * placed. Valid only after the query is admitted.
+   *
+   * @return queue name, or null if queuing is not enabled.
+   */
+
+  String queueName();
+
+  /**
+   * Mark the query as completing, giving up its slot in the
+   * cluster. Releases any lease that may be held for a system with queues.
+   */
+
+  void exit();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
new file mode 100644
index 0000000..71dabaf
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+/**
+ * Drillbit-wide resource manager shared by all queries. Manages memory (at
+ * present) and CPU (planned). Since queries are the primary consumer of
+ * resources, manages resources by throttling queries into the system, and
+ * allocating resources to queries in order to control total use. An "null"
+ * implementation handles the case of no queuing. Clearly, the null case cannot
+ * effectively control resource use.
+ */
+
+public interface ResourceManager {
+
+  /**
+   * Returns the memory, in bytes, assigned to each node in a Drill cluster.
+   * Drill requires that nodes are symmetrical. So, knowing the memory on any
+   * one node also gives the memory on all other nodes.
+   *
+   * @return the memory, in bytes, available in each Drillbit
+   */
+  long memoryPerNode();
+
+  int cpusPerNode();
+
+  /**
+   * Create a resource manager to prepare or describe a query. In this form, no
+   * queuing is done, but the plan is created as if queuing had been done. Used
+   * when executing EXPLAIN PLAN.
+   *
+   * @return a resource manager for the query
+   */
+
+  QueryResourceAllocator newResourceAllocator(QueryContext queryContext);
+
+  /**
+   * Create a resource manager to execute a query.
+   *
+   * @param foreman
+   *          Foreman which manages the execution
+   * @return a resource manager for the query
+   */
+
+  QueryResourceManager newQueryRM(final Foreman foreman);
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
new file mode 100644
index 0000000..4305891
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Builds the proper resource manager and queue implementation for the 
configured
+ * system options.
+ * <p>
+ * <ul>
+ * <li>If the Drillbit is embedded<ul>
+ * <li>If queues are enabled, then the admission-controlled resource manager
+ * with the local query queue.</li>
+ * <li>Otherwise, the default resource manager and no queues.</li>
+ * </ul></li>
+ * <li>If the Drillbit is in a cluster<ul>
+ * <li>If queues are enabled, then the admission-controlled resource manager
+ * with the distributed query queue.</li>
+ * <li>Otherwise, the default resource manager and no queues.</li>
+ * </ul></li>
+ * </ul>
+ * Configuration settings:
+ * <dl>
+ * <dt>Cluster coordinator instance</dt>
+ * <dd>If an instance of <tt>LocalClusterCoordinator</tt>, the Drillbit is
+ * embedded, else it is in a cluster.</dd>
+ * <dt><tt>drill.exec.queue.embedded.enable</tt> boot config<dt>
+ * <dd>If enabled, and if embedded, then use the local queue.</dd>
+ * <dt><tt>exec.queue.enable</tt> system option</dt>
+ * <dd>If enabled, and if in a cluster, then use the distributed queue.</dd>
+ * </dl>
+ */
+public class ResourceManagerBuilder {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ResourceManagerBuilder.class);
+
+  private DrillbitContext context ;
+
+  public ResourceManagerBuilder(final DrillbitContext context) {
+    this.context = context;
+  }
+
+  @SuppressWarnings("resource")
+  public ResourceManager build() {
+    ClusterCoordinator coord = context.getClusterCoordinator();
+    DrillConfig config = context.getConfig();
+    if (coord instanceof LocalClusterCoordinator) {
+      if (config.getBoolean(EmbeddedQueryQueue.ENABLED)) {
+        logger.debug("Enabling embedded, local query queue.");
+        return new ThrottledResourceManager(context, new 
EmbeddedQueryQueue(context));
+      } else {
+        logger.debug("No query queueing enabled.");
+        return new DefaultResourceManager();
+      }
+    } else {
+      return new DynamicResourceManager(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
new file mode 100644
index 0000000..b46fe09
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Global resource manager that provides basic admission control (AC) via a
+ * configured queue: either the Zookeeper-based distributed queue or the
+ * in-process embedded Drillbit queue. The queue places an upper limit on the
+ * number of running queries. This limit then "slices" memory and CPU between
+ * queries: each gets the same share of resources.
+ * <p>
+ * This is a "basic" implementation. Clearly, a more advanced implementation
+ * could look at query cost to determine whether to give a given query more or
+ * less than the "standard" share. That is left as a future exercise; in this
+ * version we just want to get the basics working.
+ * <p>
+ * This is the resource manager level. This resource manager is paired with a
+ * queue implementation to produce a complete solution. This composition-based
+ * approach allows sharing of functionality across queue implementations.
+ */
+
+public class ThrottledResourceManager extends AbstractResourceManager {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(ThrottledResourceManager.class);
+
+  public static class QueuedResourceAllocator
+      implements QueryResourceAllocator {
+
+    protected final ThrottledResourceManager rm;
+    protected QueryContext queryContext;
+    protected PhysicalPlan plan;
+    protected QueryWorkUnit work;
+    protected double queryCost;
+
+    protected QueuedResourceAllocator(final ThrottledResourceManager rm,
+        QueryContext queryContext) {
+      this.rm = rm;
+      this.queryContext = queryContext;
+    }
+
+    @Override
+    public void visitAbstractPlan(PhysicalPlan plan) {
+      this.plan = plan;
+      queryCost = plan.totalCost();
+    }
+
+    @Override
+    public void visitPhysicalPlan(final QueryWorkUnit work) {
+      this.work = work;
+      planMemory();
+    }
+
+    private void planMemory() {
+      if (plan.getProperties().hasResourcePlan) {
+        logger.debug("Memory already planned.");
+        return;
+      }
+
+      // Group fragments by node.
+
+      Map<String, Collection<PhysicalOperator>> nodeMap = buildBufferedOpMap();
+
+      // Memory must be symmetric to avoid bottlenecks in which one node has
+      // sorts (say) with less memory than another, causing skew in data 
arrival
+      // rates for downstream operators.
+
+      int width = countBufferingOperators(nodeMap);
+
+      // Then, share memory evenly across the
+      // all sort operators on that node. This handles asymmetric distribution
+      // such as occurs if a sort appears in the root fragment (the one with
+      // screen),
+      // which is never parallelized.
+
+      for (Entry<String, Collection<PhysicalOperator>> entry : 
nodeMap.entrySet()) {
+        planNodeMemory(entry.getKey(), entry.getValue(), width);
+      }
+    }
+
+    private int countBufferingOperators(
+        Map<String, Collection<PhysicalOperator>> nodeMap) {
+      int width = 0;
+      for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) {
+        width = Math.max(width, fragSorts.size());
+      }
+      return width;
+    }
+
+    /**
+     * Given the set of buffered operators (from any number of fragments) on a
+     * single node, shared the per-query memory equally across all the
+     * operators.
+     *
+     * @param nodeAddr
+     * @param bufferedOps
+     * @param width
+     */
+
+    private void planNodeMemory(String nodeAddr,
+        Collection<PhysicalOperator> bufferedOps, int width) {
+
+      // If no buffering operators, nothing to plan.
+
+      if (bufferedOps.isEmpty()) {
+        return;
+      }
+
+      // Divide node memory evenly among the set of operators, in any minor
+      // fragment, on the node. This is not very sophisticated: it does not
+      // deal with, say, three stacked sorts in which, if sort A runs, then
+      // B may be using memory, but C cannot be active. That kind of analysis
+      // is left as a later exercise.
+
+      long nodeMemory = queryMemoryPerNode();
+
+      // Set a floor on the amount of memory per operator based on the
+      // configured minimum. This is likely unhelpful because we are trying
+      // to work around constrained memory by assuming more than we actually
+      // have. This may lead to an OOM at run time.
+
+      long preferredOpMemory = nodeMemory / width;
+      long perOpMemory = Math.max(preferredOpMemory, 
rm.minimumOperatorMemory());
+      if (preferredOpMemory < perOpMemory) {
+        logger.warn("Preferred per-operator memory: {}, actual amount: {}",
+            preferredOpMemory, perOpMemory);
+      }
+      logger.debug(
+          "Query: {}, Node: {}, allocating {} bytes each for {} buffered 
operator(s).",
+          QueryIdHelper.getQueryId(queryContext.getQueryId()), nodeAddr,
+          perOpMemory, width);
+
+      for (PhysicalOperator op : bufferedOps) {
+
+        // Limit the memory to the maximum in the plan. Doing so is
+        // likely unnecessary, and perhaps harmful, because the pre-planned
+        // allocation is the default maximum hard-coded to 10 GB. This means
+        // that even if 20 GB is available to the sort, it won't use more
+        // than 10GB. This is probably more of a bug than a feature.
+
+        long alloc = Math.min(perOpMemory, op.getMaxAllocation());
+
+        // Place a floor on the memory that is the initial allocation,
+        // since we don't want the operator to run out of memory when it
+        // first starts.
+
+        alloc = Math.max(alloc, op.getInitialAllocation());
+
+        if (alloc > preferredOpMemory && alloc != perOpMemory) {
+          logger.warn("Allocated memory of {} for {} exceeds available memory 
of {} " +
+                      "due to operator minimum",
+              alloc, op.getClass().getSimpleName(), preferredOpMemory);
+        }
+        else if (alloc < preferredOpMemory) {
+          logger.warn("Allocated memory of {} for {} is less than available 
memory " +
+              "of {} due to operator limit",
+              alloc, op.getClass().getSimpleName(), preferredOpMemory);
+        }
+        op.setMaxAllocation(alloc);
+      }
+    }
+
+    protected long queryMemoryPerNode() {
+      return rm.defaultQueryMemoryPerNode(plan.totalCost());
+    }
+
+    /**
+     * Build a list of external sorts grouped by node. We start with a list of
+     * minor fragments, each with an endpoint (node). Multiple minor fragments
+     * may appear on each node, and each minor fragment may have 0, 1 or more
+     * sorts.
+     *
+     * @return
+     */
+
+    private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() {
+      Multimap<String, PhysicalOperator> map = ArrayListMultimap.create();
+      getBufferedOps(map, work.getRootFragmentDefn());
+      for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) {
+        getBufferedOps(map, defn);
+      }
+      return map.asMap();
+    }
+
+    /**
+     * Searches a fragment operator tree to find buffered within that fragment.
+     */
+
+    protected static class BufferedOpFinder extends
+        AbstractPhysicalVisitor<Void, List<PhysicalOperator>, 
RuntimeException> {
+      @Override
+      public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
+          throws RuntimeException {
+        if (op.isBufferedOperator()) {
+          value.add(op);
+        }
+        visitChildren(op, value);
+        return null;
+      }
+    }
+
+    private void getBufferedOps(Multimap<String, PhysicalOperator> map,
+        MinorFragmentDefn defn) {
+      List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root());
+      if (!bufferedOps.isEmpty()) {
+        map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps);
+      }
+    }
+
+    /**
+     * Search an individual fragment tree to find any buffered operators it may
+     * contain.
+     *
+     * @param root
+     * @return
+     */
+
+    private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
+      List<PhysicalOperator> bufferedOps = new ArrayList<>();
+      BufferedOpFinder finder = new BufferedOpFinder();
+      root.accept(finder, bufferedOps);
+      return bufferedOps;
+    }
+  }
+
+  /**
+   * Per-query resource manager. Handles resources and optional queue lease for
+   * a single query. As such, this is a non-shared resource: it is associated
+   * with a Foreman: a single thread at plan time, and a single event (in some
+   * thread) at query completion time. Because of these semantics, no
+   * synchronization is needed within this class.
+   */
+
+  public static class QueuedQueryResourceManager extends 
QueuedResourceAllocator
+      implements QueryResourceManager {
+
+    private final Foreman foreman;
+    private QueueLease lease;
+
+    public QueuedQueryResourceManager(final ThrottledResourceManager rm,
+        final Foreman foreman) {
+      super(rm, foreman.getQueryContext());
+      this.foreman = foreman;
+    }
+
+    @Override
+    public void setCost(double cost) {
+      this.queryCost = cost;
+    }
+
+    @Override
+    public void admit() throws QueueTimeoutException, QueryQueueException {
+      lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
+    }
+
+    @Override
+    protected long queryMemoryPerNode() {
+
+      // No lease: use static estimate.
+
+      if (lease == null) {
+        return super.queryMemoryPerNode();
+      }
+
+      // Use actual memory assigned to this query.
+
+      return lease.queryMemoryPerNode();
+    }
+
+    @Override
+    public void exit() {
+      if (lease != null) {
+        lease.release();
+      }
+      lease = null;
+    }
+
+    @Override
+    public boolean hasQueue() { return true; }
+
+    @Override
+    public String queueName() {
+      return lease == null ? null : lease.queueName();
+    }
+  }
+
+  private final QueryQueue queue;
+
+  public ThrottledResourceManager(final DrillbitContext drillbitContext,
+      final QueryQueue queue) {
+    super(drillbitContext);
+    this.queue = queue;
+    queue.setMemoryPerNode(memoryPerNode());
+  }
+
+  public long minimumOperatorMemory() {
+    return queue.minimumOperatorMemory();
+  }
+
+  public long defaultQueryMemoryPerNode(double cost) {
+    return queue.defaultQueryMemoryPerNode(cost);
+  }
+
+  public QueryQueue queue() { return queue; }
+
+  @Override
+  public QueryResourceAllocator newResourceAllocator(
+      QueryContext queryContext) {
+    return new QueuedResourceAllocator(this, queryContext);
+  }
+
+  @Override
+  public QueryResourceManager newQueryRM(Foreman foreman) {
+    return new QueuedQueryResourceManager(this, foreman);
+  }
+
+  @Override
+  public void close() {
+    queue.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bbc42240/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
new file mode 100644
index 0000000..0b9e9da
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides resource management and queuing support for the Drill foreman.
+ * The resource manager tracks total resources available to Drill. Several
+ * implementations are available: a default implementation for systems without
+ * queueing and an access-controlled (AC) version for systems with queues.
+ * <p>
+ * Each resource manager provides a per-query manager that is responsible
+ * for queuing the query (if needed) and memory allocation to the query based
+ * on query characteristics and memory assigned to the query.
+ * <p>
+ * Provides two different queue implementations. A distributed ZooKeeper queue
+ * and a local queue useful for embedded Drillbits (and for testing.)
+ */
+package org.apache.drill.exec.work.foreman.rm;

Reply via email to