Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/928#discussion_r142726899
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
 ---
    @@ -0,0 +1,125 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman.rm;
    +
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.ops.QueryContext;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.server.options.SystemOptionManager;
    +import org.apache.drill.exec.work.foreman.Foreman;
    +import 
org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter;
    +
    +/**
    + * Wrapper around the default and/or distributed resource managers
    + * to allow dynamically enabling and disabling queueing.
    + */
    +
    +public class DynamicResourceManager implements ResourceManager {
    +
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class);
    +
    +  private final DrillbitContext context;
    +  private ResourceManager defaultRm;
    +  private ResourceManager queueingRm;
    +  private ResourceManager activeRm;
    +  public long lastUpdateTime;
    +  public int recheckDelayMs = 5000;
    +
    +  public DynamicResourceManager(final DrillbitContext context) {
    +    this.context = context;
    +    refreshRM();
    +  }
    +
    +  public synchronized ResourceManager activeRM() {
    +    refreshRM();
    +    return activeRm;
    +  }
    +
    +  @Override
    +  public long memoryPerNode() {
    +    return activeRm.memoryPerNode();
    +  }
    +
    +  @Override
    +  public int cpusPerNode() {
    +    return activeRm.cpusPerNode();
    +  }
    +
    +  @Override
    +  public synchronized QueryResourceAllocator 
newResourceAllocator(QueryContext queryContext) {
    +    refreshRM();
    +    return activeRm.newResourceAllocator(queryContext);
    +  }
    +
    +  @Override
    +  public synchronized QueryResourceManager newQueryRM(Foreman foreman) {
    +    refreshRM();
    +    return activeRm.newQueryRM(foreman);
    +  }
    +
    +  private void refreshRM() {
    +    long now = System.currentTimeMillis();
    +    if (lastUpdateTime + recheckDelayMs >= now) {
    +      return;
    +    }
    +    lastUpdateTime = now;
    +    @SuppressWarnings("resource")
    +    SystemOptionManager systemOptions = context.getOptionManager();
    +    if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) {
    +      if (queueingRm == null) {
    +        StatusAdapter statusAdapter = new StatusAdapter() {
    +          @Override
    +          public boolean inShutDown() {
    +            // Drill provides no shutdown state at present. Once
    +            // DRILL-4286 (graceful shutdown) is merged, use the
    +            // new Drillbit status to determine when the Drillbit
    +            // is shutting down.
    +            return false;
    +          }
    +        };
    +        queueingRm = new ThrottledResourceManager(context,
    +            new DistributedQueryQueue(context, statusAdapter));
    +      }
    +      if (activeRm != queueingRm) {
    +        logger.debug("Enabling ZK-based query queue.");
    +        activeRm = queueingRm;
    +      }
    +    } else {
    +      if (defaultRm == null) {
    +        defaultRm = new DefaultResourceManager();
    +      }
    +      if (activeRm != defaultRm) {
    +        logger.debug("Disabling ZK-based query queue.");
    +        activeRm = defaultRm;
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    if (defaultRm != null) {
    +      defaultRm.close();
    +      defaultRm = null;
    +    }
    --- End diff --
    
    What if `defaultRM` closing fails, is it ok  that `queueingRm` remains 
unclosed?


---

Reply via email to