Github user arina-ielchiieva commented on a diff in the pull request:
https://github.com/apache/drill/pull/928#discussion_r142946871
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+
+/**
+ * Distributed query queue which uses a Zookeeper distributed semaphore to
+ * control queuing across the cluster. The distributed queue is actually
two
+ * queues: one for "small" queries, another for "large" queries. Query
size is
+ * determined by the Planner's estimate of query cost.
+ * <p>
+ * This queue is configured using system options:
+ * <dl>
+ * <dt><tt>exec.queue.enable</tt>
+ * <dt>
+ * <dd>Set to true to enable the distributed queue.</dd>
+ * <dt><tt>exec.queue.large</tt>
+ * <dt>
+ * <dd>The maximum number of large queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.small</tt>
+ * <dt>
+ * <dd>The maximum number of small queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.threshold</tt>
+ * <dt>
+ * <dd>The cost threshold. Queries below this size are small, at
+ * or above this size are large..</dd>
+ * <dt><tt>exec.queue.timeout_millis</tt>
+ * <dt>
+ * <dd>The maximum time (in milliseconds) a query will wait in the
+ * queue before failing.</dd>
+ * </dl>
+ * <p>
+ * The above values are refreshed every five seconds. This aids performance
+ * a bit in systems with very high query arrival rates.
+ */
+
+public class DistributedQueryQueue implements QueryQueue {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
+
+ private class DistributedQueueLease implements QueueLease {
+ private final QueryId queryId;
+ private DistributedLease lease;
+ private final String queueName;
+
+ /**
+ * Memory allocated to the query. Though all queries in the queue use
+ * the same memory allocation rules, those rules can change at any time
+ * as the user changes system options. This value captures the value
+ * calculated at the time that this lease was granted.
+ */
+ private long queryMemory;
+
+ public DistributedQueueLease(QueryId queryId, String queueName,
+ DistributedLease lease, long queryMemory) {
+ this.queryId = queryId;
+ this.queueName = queueName;
+ this.lease = lease;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Lease for %s queue to query %s",
+ queueName, QueryIdHelper.getQueryId(queryId));
+ }
+
+ @Override
+ public long queryMemoryPerNode() { return queryMemory; }
+
+ @Override
+ public void release() {
+ DistributedQueryQueue.this.release(this);
+ }
+
+ @Override
+ public String queueName() { return queueName; }
+ }
+
+ /**
+ * Exposes a snapshot of internal state information for use in status
+ * reporting, such as in the UI.
+ */
+
+ @XmlRootElement
+ public static class ZKQueueInfo {
+ public final int smallQueueSize;
+ public final int largeQueueSize;
+ public final double queueThreshold;
+ public final long memoryPerNode;
+ public final long memoryPerSmallQuery;
+ public final long memoryPerLargeQuery;
+
+ public ZKQueueInfo(DistributedQueryQueue queue) {
+ smallQueueSize = queue.configSet.smallQueueSize;
+ largeQueueSize = queue.configSet.largeQueueSize;
+ queueThreshold = queue.configSet.queueThreshold;
+ memoryPerNode = queue.memoryPerNode;
+ memoryPerSmallQuery = queue.memoryPerSmallQuery;
+ memoryPerLargeQuery = queue.memoryPerLargeQuery;
+ }
+ }
+
+ public interface StatusAdapter {
+ boolean inShutDown();
+ }
+
+ /**
+ * Holds runtime configuration options. Allows polling the options
+ * for changes, and easily detecting changes.
+ */
+
+ private static class ConfigSet {
+ private final long queueThreshold;
+ private final long queueTimeout;
+ private final int largeQueueSize;
+ private final int smallQueueSize;
+ private final double largeToSmallRatio;
+ private final double reserveMemoryRatio;
+ private final long minimumOperatorMemory;
+
+ public ConfigSet(SystemOptionManager optionManager) {
+ queueThreshold =
optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
+ queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+ largeQueueSize = (int)
optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
+ smallQueueSize = (int)
optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
+ largeToSmallRatio =
optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
+ reserveMemoryRatio =
optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
+ minimumOperatorMemory =
optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || ! (other instanceof ConfigSet)) {
+ return false;
+ }
+ ConfigSet otherSet = (ConfigSet) other;
+ return queueThreshold == otherSet.queueThreshold &&
+ queueTimeout == otherSet.queueTimeout &&
+ largeQueueSize == otherSet.largeQueueSize &&
+ smallQueueSize == otherSet.smallQueueSize &&
+ largeToSmallRatio == otherSet.largeToSmallRatio &&
+ reserveMemoryRatio == otherSet.reserveMemoryRatio &&
+ minimumOperatorMemory == otherSet.minimumOperatorMemory;
+ }
+ }
+
+ private long memoryPerNode;
+ private SystemOptionManager optionManager;
+ private ConfigSet configSet;
+ private ClusterCoordinator clusterCoordinator;
+ private long refreshTime;
+ private long memoryPerSmallQuery;
+ private long memoryPerLargeQuery;
+ private final StatusAdapter statusAdapter;
+
+ public DistributedQueryQueue(DrillbitContext context, StatusAdapter
adapter) {
+ statusAdapter = adapter;
+ optionManager = context.getOptionManager();
+ clusterCoordinator = context.getClusterCoordinator();
+ }
+
+ @Override
+ public void setMemoryPerNode(long memoryPerNode) {
+ this.memoryPerNode = memoryPerNode;
+ refreshConfig();
+ }
+
+ @Override
+ public long defaultQueryMemoryPerNode(double cost) {
+ return (cost < configSet.queueThreshold)
+ ? memoryPerSmallQuery
+ : memoryPerLargeQuery;
+ }
+
+ @Override
+ public long minimumOperatorMemory() { return
configSet.minimumOperatorMemory; }
+
+ /**
+ * This limits the number of "small" and "large" queries that a Drill
cluster will run
+ * simultaneously, if queuing is enabled. If the query is unable to run,
this will block
+ * until it can. Beware that this is called under run(), and so will
consume a Thread
+ * while it waits for the required distributed semaphore.
+ *
+ * @param queryId query identifier
+ * @param totalCost the query plan
+ * @throws QueryQueueException
+ * @throws QueueTimeoutException
+ */
+
+ @SuppressWarnings("resource")
+ @Override
+ public QueueLease enqueue(QueryId queryId, double cost) throws
QueryQueueException, QueueTimeoutException {
+ final String queueName;
+ DistributedLease lease = null;
+ long queryMemory;
+ final DistributedSemaphore distributedSemaphore;
+ try {
+
+ // Only the refresh and queue computation is synchronized.
+
+ synchronized(this) {
+ refreshConfig();
+
+ // get the appropriate semaphore
+ if (cost >= configSet.queueThreshold) {
+ distributedSemaphore =
clusterCoordinator.getSemaphore("query.large", configSet.largeQueueSize);
+ queueName = "large";
+ queryMemory = memoryPerLargeQuery;
+ } else {
+ distributedSemaphore =
clusterCoordinator.getSemaphore("query.small", configSet.smallQueueSize);
+ queueName = "small";
+ queryMemory = memoryPerSmallQuery;
+ }
+ }
+ logger.debug("Query {} with cost {} placed into the {} queue.",
+ QueryIdHelper.getQueryId(queryId), cost, queueName);
+
+ lease = distributedSemaphore.acquire(configSet.queueTimeout,
TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ logger.error("Unable to acquire slot for query " +
+ QueryIdHelper.getQueryId(queryId), e);
+ throw new QueryQueueException("Unable to acquire slot for query.",
e);
+ }
+
+ if (lease == null) {
+ int timeoutSecs = (int) Math.round(configSet.queueTimeout/1000.0);
+ logger.warn("Queue timeout: {} after {} seconds.", queueName,
timeoutSecs);
+ throw new QueueTimeoutException(queryId, queueName, timeoutSecs);
+ }
+ return new DistributedQueueLease(queryId, queueName, lease,
queryMemory);
+ }
+
+ private synchronized void refreshConfig() {
+ long now = System.currentTimeMillis();
+ if (now < refreshTime) {
--- End diff --
The same here. Do we need delay in refresh?
---