Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/928#discussion_r142948996
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
 ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman.rm;
    +
    +import java.util.concurrent.Semaphore;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.helper.QueryIdHelper;
    +import org.apache.drill.exec.server.DrillbitContext;
    +
    +/**
    + * Query queue to be used in an embedded Drillbit. This queue has scope of 
only
    + * the one Drillbit (not even multiple Drillbits in the same process.) 
Primarily
    + * intended for testing, but may possibly be useful for other embedded
    + * applications.
    + * <p>
    + * Configuration is via config parameters (not via system options as for 
the
    + * distributed queue.)
    + * <dl>
    + * <dt><tt>drill.queue.embedded.enabled</tt></dt>
    + * <dd>Set to true to enable the embedded queue. But, this setting has 
effect
    + * only if the Drillbit is, in fact, embedded.</dd>
    + * <dt><tt>drill.queue.embedded.size</tt></dt>
    + * <dd>The number of active queries, all others queue. There is no upper 
limit
    + * on the number of queued entries.</dt>
    + * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt>
    + * <dd>The maximum time a query will wait in the queue before failing.</dd>
    + * </dl>
    + */
    +
    +public class EmbeddedQueryQueue implements QueryQueue {
    +
    +  public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded";
    +  public static String ENABLED = EMBEDDED_QUEUE + ".enable";
    +  public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size";
    +  public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms";
    +
    +  public class EmbeddedQueueLease implements QueueLease {
    +
    +    private final QueryId queryId;
    +    private boolean released;
    +    private long queryMemory;
    +
    +    public EmbeddedQueueLease(QueryId queryId, long queryMemory) {
    +      this.queryId = queryId;
    +      this.queryMemory = queryMemory;
    +    }
    +
    +    @Override
    +    public String toString( ) {
    +      String msg = "Embedded queue lease for " +
    +          QueryIdHelper.getQueryId(queryId);
    +      if (released) {
    +        msg += " (released)";
    +      }
    +      return msg;
    +    }
    +
    +    @Override
    +    public long queryMemoryPerNode() {
    +      return queryMemory;
    +    }
    +
    +    @Override
    +    public void release() {
    +      EmbeddedQueryQueue.this.release(this);
    +    }
    +
    +    @Override
    +    public String queueName() { return "local-queue"; }
    +  }
    +
    +  private final int queueTimeoutMs;
    +  private final int queueSize;
    +  private final Semaphore semaphore;
    +  private long memoryPerQuery;
    +  private final long minimumOperatorMemory;
    +
    +  public EmbeddedQueryQueue(DrillbitContext context) {
    +    DrillConfig config = context.getConfig();
    +    queueTimeoutMs = config.getInt(TIMEOUT_MS);
    +    queueSize = config.getInt(QUEUE_SIZE);
    +    semaphore = new Semaphore(queueSize, true);
    +    minimumOperatorMemory = context.getOptionManager()
    +        .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
    +  }
    +
    +  @Override
    +  public boolean enabled() { return true; }
    +
    +  @Override
    +  public void setMemoryPerNode(long memoryPerNode) {
    +    memoryPerQuery = memoryPerNode / queueSize;
    +  }
    +
    +  @Override
    +  public long defaultQueryMemoryPerNode(double cost) {
    +    return memoryPerQuery;
    +  }
    +
    +  @Override
    +  public QueueLease enqueue(QueryId queryId, double cost)
    +      throws QueueTimeoutException, QueryQueueException {
    +    try {
    +      if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) {
    +        throw new QueueTimeoutException(queryId, "embedded", 
queueTimeoutMs);
    +      }
    +    } catch (InterruptedException e) {
    +      throw new QueryQueueException("Interrupted", e);
    +    }
    +    return new EmbeddedQueueLease(queryId, memoryPerQuery);
    +  }
    +
    +  private void release(QueueLease lease) {
    --- End diff --
    
    This method can accept `EmbeddedQueueLease` only to avoid casting.


---

Reply via email to