http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index e69f61c..bc8495c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -31,11 +31,11 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
@@ -357,28 +357,11 @@ public class ParquetGroupScan extends AbstractGroupScan {
     return columns;
   }
 
-  @Override
-  public OperatorCost getCost() {
-    // TODO Figure out how to properly calculate cost
-    return new OperatorCost(1, rowGroupInfos.size(), 1, 1);
-  }
 
   @Override
-  public Size getSize() {
-//    long totalSize = 0;
-//    for (RowGroupInfo rowGrpInfo : rowGroupInfos) {
-//      totalSize += rowGrpInfo.getTotalBytes();
-//    }
-//    int rowSize = (int) (totalSize/rowCount);
-
-    // if all the columns are required.
-    if (columns == null || columns.isEmpty()) {
-      return new Size(rowCount, columnValueCounts.size());
-    } else {
-      // project pushdown : subset of columns are required.
-      return new Size(rowCount, columns.size());
-    }
-
+  public ScanStats getScanStats() {
+    int columnCount = columns == null ? 20 : columns.size();
+    return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, 
rowCount * columnCount);
   }
 
   @Override
@@ -412,11 +395,6 @@ public class ParquetGroupScan extends AbstractGroupScan {
     return true;
   }
 
-  @Override
-  public GroupScanProperty getProperty() {
-    return GroupScanProperty.EXACT_ROW_COUNT;
-  }
-
   /**
    *  Return column value count for the specified column. If does not contain 
such column, return 0.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 1f66e9f..d1a086c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -22,15 +22,12 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -102,16 +99,6 @@ public class ParquetRowGroupScan extends AbstractBase 
implements SubScan {
   }
 
   @Override
-  public OperatorCost getCost() {
-    return null;
-  }
-
-  @Override
-  public Size getSize() {
-    return null;
-  }
-
-  @Override
   public boolean isExecutable() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 67bab5e..75f0e74 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -90,12 +89,6 @@ public class ParquetWriter extends AbstractWriter {
   }
 
   @Override
-  public OperatorCost getCost() {
-    // TODO:
-    return new OperatorCost(1,1,1,1);
-  }
-
-  @Override
   public int getOperatorType() {
     return CoreOperatorType.PARQUET_WRITER_VALUE;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 09aabb0..d260927 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -25,11 +25,11 @@ import 
org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -62,14 +62,8 @@ public class SystemTableScan extends AbstractGroupScan 
implements SubScan{
     this.plugin = plugin;
   }
 
-  @Override
-  public OperatorCost getCost() {
-    return new OperatorCost(1,1,1,1);
-  }
-
-  @Override
-  public Size getSize() {
-    return new Size(100,1);
+  public ScanStats getScanStats(){
+    return ScanStats.TRIVIAL_TABLE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 94e2cd4..3048f77 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -22,14 +22,16 @@ import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache.CacheConfig;
 import org.apache.drill.exec.cache.DistributedCache.SerializationMode;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.QueryContext;
@@ -74,6 +76,7 @@ import com.google.common.collect.Lists;
 public class Foreman implements Runnable, Closeable, Comparable<Object>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Foreman.class);
 
+
   public static final CacheConfig<FragmentHandle, PlanFragment> FRAGMENT_CACHE 
= CacheConfig //
       .newBuilder(FragmentHandle.class, PlanFragment.class) //
       .mode(SerializationMode.PROTOBUF) //
@@ -86,12 +89,33 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
   private WorkerBee bee;
   private UserClientConnection initiatingClient;
   private final AtomicState<QueryState> state;
+  private final DistributedSemaphore smallSemaphore;
+  private final DistributedSemaphore largeSemaphore;
+  private final long queueThreshold;
+  private final long queueTimeout;
+  private volatile DistributedLease lease;
+  private final boolean queuingEnabled;
 
   public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection 
connection, QueryId queryId,
       RunQuery queryRequest) {
     this.queryId = queryId;
     this.queryRequest = queryRequest;
     this.context = new QueryContext(connection.getSession(), queryId, 
dContext);
+    this.queuingEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
+    if(queuingEnabled){
+      int smallQueue = 
context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
+      int largeQueue = 
context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
+      this.largeSemaphore = 
dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue);
+      this.smallSemaphore = 
dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue);
+      this.queueThreshold = 
context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
+      this.queueTimeout = 
context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
+    }else{
+      this.largeSemaphore = null;
+      this.smallSemaphore = null;
+      this.queueThreshold = 0;
+      this.queueTimeout = 0;
+    }
+
     this.initiatingClient = connection;
     this.fragmentManager = new QueryManager(queryId, queryRequest, 
bee.getContext().getPersistentStoreProvider(), new ForemanManagerListener(), 
dContext.getController(), this);
     this.bee = bee;
@@ -194,10 +218,21 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
       System.out.flush();
       System.exit(-1);
     }finally{
+      releaseLease();
       Thread.currentThread().setName(originalThread);
     }
   }
 
+  private void releaseLease(){
+    if(lease != null){
+      try{
+        lease.close();
+      }catch(Exception e){
+        logger.warn("Failure while releasing lease.", e);
+      };
+    }
+
+  }
   private void parseAndRunLogicalPlan(String json) {
 
     try {
@@ -260,7 +295,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     }
 
   }
-
   private void parseAndRunPhysicalPlan(String json) {
     try {
       PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -272,6 +306,9 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
 
   private void runPhysicalPlan(PhysicalPlan plan) {
 
+
+
+
     if(plan.getProperties().resultMode != ResultMode.EXEC){
       fail(String.format("Failure running plan.  You requested a result mode 
of %s and a physical plan can only be output as EXEC", 
plan.getProperties().resultMode), new Exception());
     }
@@ -287,12 +324,20 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     }
 
     PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
-    SimpleParallelizer parallelizer = new SimpleParallelizer()
-      
.setGlobalMaxWidth(context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH))
-      
.setMaxWidthPerEndpoint(context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT))
-      
.setAffinityFactor(context.getConfig().getDouble(ExecConstants.AFFINITY_FACTOR));
+    SimpleParallelizer parallelizer = new SimpleParallelizer(context);
 
     try {
+
+      double size = 0;
+      for(PhysicalOperator ops : plan.getSortedOperators()){
+        size += ops.getCost();
+      }
+      if(queuingEnabled && size > this.queueThreshold){
+        this.lease = largeSemaphore.acquire(this.queueTimeout, 
TimeUnit.MILLISECONDS);
+      }else{
+        this.lease = smallSemaphore.acquire(this.queueTimeout, 
TimeUnit.MILLISECONDS);
+      }
+
       QueryWorkUnit work = 
parallelizer.getFragments(context.getOptions().getSessionOptionList(), 
context.getCurrentEndpoint(),
           queryId, context.getActiveEndpoints(), context.getPlanReader(), 
rootFragment, planningSet);
 
@@ -324,7 +369,7 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
       logger.debug("Fragments running.");
       state.updateState(QueryState.PENDING, QueryState.RUNNING);
 
-    } catch (ExecutionSetupException | RpcException e) {
+    } catch (Exception e) {
       fail("Failure while setting up query.", e);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 
b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 562d7e8..7cfe51a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.client.PrintingResultsListener;
@@ -99,8 +100,14 @@ public class BaseTestQuery extends ExecTest{
     bit.run();
     client = new DrillClient(config, serviceSet.getCoordinator());
     client.connect();
+    List<QueryResultBatch> results = client.runQuery(QueryType.SQL, 
String.format("alter session set `%s` = 2", 
ExecConstants.MAX_WIDTH_PER_NODE_KEY));
+    for(QueryResultBatch b : results){
+      b.release();
+    }
   }
 
+
+
   protected BufferAllocator getAllocator(){
     return allocator;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
index 1c5a7f9..b5b63bc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
@@ -25,7 +25,7 @@ public class TestTpchDistributed extends BaseTestQuery{
 
   private void testDistributed(String fileName) throws Exception{
     String query = getFile(fileName);
-    test(query);
+    test("alter session set `planner.slice_target` = 10; " + query);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index ea19351..f6972c3 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -50,7 +50,7 @@ public class TestFragmentChecker extends PopUnitTestBase{
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, 
CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
     Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
     PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
-    SimpleParallelizer par = new SimpleParallelizer();
+    SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2);
     List<DrillbitEndpoint> endpoints = Lists.newArrayList();
     DrillbitEndpoint localBit = null;
     for(int i =0; i < bitCount; i++){
@@ -59,7 +59,6 @@ public class TestFragmentChecker extends PopUnitTestBase{
       endpoints.add(b1);
     }
 
-    par.setGlobalMaxWidth(10).setMaxWidthPerEndpoint(5);
     QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, 
QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] 
=========", qwu.getRootFragment().getHandle().getMajorFragmentId(), 
qwu.getRootFragment().getHandle().getMinorFragmentId()));
 

Reply via email to