Github user arina-ielchiieva commented on a diff in the pull request:
https://github.com/apache/drill/pull/928#discussion_r142721394
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+
+/**
+ * Distributed query queue which uses a Zookeeper distributed semaphore to
+ * control queuing across the cluster. The distributed queue is actually
two
+ * queues: one for "small" queries, another for "large" queries. Query
size is
+ * determined by the Planner's estimate of query cost.
+ * <p>
+ * This queue is configured using system options:
+ * <dl>
+ * <dt><tt>exec.queue.enable</tt>
+ * <dt>
+ * <dd>Set to true to enable the distributed queue.</dd>
+ * <dt><tt>exec.queue.large</tt>
+ * <dt>
+ * <dd>The maximum number of large queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.small</tt>
+ * <dt>
+ * <dd>The maximum number of small queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.threshold</tt>
+ * <dt>
+ * <dd>The cost threshold. Queries below this size are small, at
+ * or above this size are large..</dd>
+ * <dt><tt>exec.queue.timeout_millis</tt>
+ * <dt>
+ * <dd>The maximum time (in milliseconds) a query will wait in the
+ * queue before failing.</dd>
+ * </dl>
+ * <p>
+ * The above values are refreshed every five seconds. This aids performance
+ * a bit in systems with very high query arrival rates.
+ */
+
+public class DistributedQueryQueue implements QueryQueue {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
+
+ private class DistributedQueueLease implements QueueLease {
+ private final QueryId queryId;
+ private DistributedLease lease;
+ private final String queueName;
+
+ /**
+ * Memory allocated to the query. Though all queries in the queue use
+ * the same memory allocation rules, those rules can change at any time
+ * as the user changes system options. This value captures the value
+ * calculated at the time that this lease was granted.
+ */
+ private long queryMemory;
+
+ public DistributedQueueLease(QueryId queryId, String queueName,
+ DistributedLease lease, long queryMemory) {
+ this.queryId = queryId;
+ this.queueName = queueName;
+ this.lease = lease;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Lease for %s queue to query %s",
+ queueName, QueryIdHelper.getQueryId(queryId));
+ }
+
+ @Override
+ public long queryMemoryPerNode() { return queryMemory; }
+
+ @Override
+ public void release() {
+ DistributedQueryQueue.this.release(this);
+ }
+
+ @Override
+ public String queueName() { return queueName; }
+ }
+
+ /**
+ * Exposes a snapshot of internal state information for use in status
+ * reporting, such as in the UI.
+ */
+
+ @XmlRootElement
+ public static class ZKQueueInfo {
+ public final int smallQueueSize;
+ public final int largeQueueSize;
+ public final double queueThreshold;
+ public final long memoryPerNode;
+ public final long memoryPerSmallQuery;
+ public final long memoryPerLargeQuery;
+
+ public ZKQueueInfo(DistributedQueryQueue queue) {
+ smallQueueSize = queue.configSet.smallQueueSize;
+ largeQueueSize = queue.configSet.largeQueueSize;
+ queueThreshold = queue.configSet.queueThreshold;
+ memoryPerNode = queue.memoryPerNode;
+ memoryPerSmallQuery = queue.memoryPerSmallQuery;
+ memoryPerLargeQuery = queue.memoryPerLargeQuery;
+ }
+ }
+
+ public interface StatusAdapter {
--- End diff --
Factor out in separate class? Since it is used in other classes as well.
---