Github user arina-ielchiieva commented on a diff in the pull request:
https://github.com/apache/drill/pull/928#discussion_r142726899
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import
org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter;
+
+/**
+ * Wrapper around the default and/or distributed resource managers
+ * to allow dynamically enabling and disabling queueing.
+ */
+
+public class DynamicResourceManager implements ResourceManager {
+
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class);
+
+ private final DrillbitContext context;
+ private ResourceManager defaultRm;
+ private ResourceManager queueingRm;
+ private ResourceManager activeRm;
+ public long lastUpdateTime;
+ public int recheckDelayMs = 5000;
+
+ public DynamicResourceManager(final DrillbitContext context) {
+ this.context = context;
+ refreshRM();
+ }
+
+ public synchronized ResourceManager activeRM() {
+ refreshRM();
+ return activeRm;
+ }
+
+ @Override
+ public long memoryPerNode() {
+ return activeRm.memoryPerNode();
+ }
+
+ @Override
+ public int cpusPerNode() {
+ return activeRm.cpusPerNode();
+ }
+
+ @Override
+ public synchronized QueryResourceAllocator
newResourceAllocator(QueryContext queryContext) {
+ refreshRM();
+ return activeRm.newResourceAllocator(queryContext);
+ }
+
+ @Override
+ public synchronized QueryResourceManager newQueryRM(Foreman foreman) {
+ refreshRM();
+ return activeRm.newQueryRM(foreman);
+ }
+
+ private void refreshRM() {
+ long now = System.currentTimeMillis();
+ if (lastUpdateTime + recheckDelayMs >= now) {
+ return;
+ }
+ lastUpdateTime = now;
+ @SuppressWarnings("resource")
+ SystemOptionManager systemOptions = context.getOptionManager();
+ if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) {
+ if (queueingRm == null) {
+ StatusAdapter statusAdapter = new StatusAdapter() {
+ @Override
+ public boolean inShutDown() {
+ // Drill provides no shutdown state at present. Once
+ // DRILL-4286 (graceful shutdown) is merged, use the
+ // new Drillbit status to determine when the Drillbit
+ // is shutting down.
+ return false;
+ }
+ };
+ queueingRm = new ThrottledResourceManager(context,
+ new DistributedQueryQueue(context, statusAdapter));
+ }
+ if (activeRm != queueingRm) {
+ logger.debug("Enabling ZK-based query queue.");
+ activeRm = queueingRm;
+ }
+ } else {
+ if (defaultRm == null) {
+ defaultRm = new DefaultResourceManager();
+ }
+ if (activeRm != defaultRm) {
+ logger.debug("Disabling ZK-based query queue.");
+ activeRm = defaultRm;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (defaultRm != null) {
+ defaultRm.close();
+ defaultRm = null;
+ }
--- End diff --
What if `defaultRM` closing fails, is it ok that `queueingRm` remains
unclosed?
---