Repository: incubator-drill
Updated Branches:
  refs/heads/master 08a9a90d9 -> 2d10afd4b


DRILL-396: QuerySubmitter enhancements


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2ed7a678
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2ed7a678
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2ed7a678

Branch: refs/heads/master
Commit: 2ed7a678b1caef67279a775d7e8b217e3e18041f
Parents: 08a9a90
Author: Steven Phillips <[email protected]>
Authored: Tue Mar 11 08:26:13 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Mar 11 08:26:13 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/client/QuerySubmitter.java       | 101 +++++++++++++++----
 .../org/apache/drill/exec/util/VectorUtil.java  |  34 +++++++
 2 files changed, 116 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ed7a678/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index b07b3ae..0253069 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -20,10 +20,13 @@ package org.apache.drill.exec.client;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -64,33 +67,47 @@ public class QuerySubmitter {
       jc.usage();
       System.exit(0);
     }
-    System.exit(submitter.submitQuery(o.location, o.planType, o.zk, o.local, 
o.bits));
+
+    System.exit(submitter.submitQuery(o.location, o.queryString, o.planType, 
o.zk, o.local, o.bits, o.format));
   }
 
   static class Options {
-    @Parameter(names = {"-f"}, description = "file containing plan", 
required=true)
+    @Parameter(names = {"-f, --file"}, description = "file containing plan", 
required=false)
     public String location = null;
 
-    @Parameter(names = {"-t"}, description = "type of plan, logical/physical", 
required=true)
+    @Parameter(names = {"-q", "-e", "--query"}, description = "query string", 
required = false)
+    public String queryString = null;
+
+    @Parameter(names = {"-t", "--type"}, description = "type of query, 
sql/logical/physical", required=true)
     public String planType;
 
-    @Parameter(names = {"-zk"}, description = "zookeeper connect string.", 
required=false)
+    @Parameter(names = {"-z", "--zookeeper"}, description = "zookeeper connect 
string.", required=false)
     public String zk = "localhost:2181";
 
-    @Parameter(names = {"-local"}, description = "run query in local mode", 
required=false)
+    @Parameter(names = {"-l", "--local"}, description = "run query in local 
mode", required=false)
     public boolean local;
 
-    @Parameter(names = "-bits", description = "number of drillbits to run. 
local mode only", required=false)
+    @Parameter(names = {"-b", "--bits"}, description = "number of drillbits to 
run. local mode only", required=false)
     public int bits = 1;
 
-    @Parameter(names = {"-h", "-help", "--help"}, description = "show usage", 
help=true)
+    @Parameter(names = {"-h", "--help"}, description = "show usage", help=true)
     public boolean help = false;
+
+    @Parameter(names = {"--format"}, description = "output format, 
csv,tsv,table", required = false)
+    public String format = "table";
   }
 
-  public int submitQuery(String planLocation, String type, String zkQuorum, 
boolean local, int bits) throws Exception {
+  public enum Format {
+    TSV, CSV, TABLE
+  }
+
+  public int submitQuery(String planLocation, String queryString, String type, 
String zkQuorum, boolean local, int bits, String format) throws Exception {
     DrillConfig config = DrillConfig.create();
     DrillClient client = null;
-    
+
+    Preconditions.checkArgument(!(planLocation == null && queryString == 
null), "Must provide either query file or query string");
+    Preconditions.checkArgument(!(planLocation != null && queryString != 
null), "Must provide either query file or query string, not both");
+
     try{
       if (local) {
         RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
@@ -106,29 +123,62 @@ public class QuerySubmitter {
         client = new DrillClient(config, clusterCoordinator);
       }
       client.connect();
-      QueryResultsListener listener = new QueryResultsListener();
-      String plan = 
Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
+      QueryResultsListener listener;
+      String plan;
+      if (queryString == null) {
+        plan = 
Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
+      } else {
+        plan = queryString;
+      }
+      String[] queries;
       UserProtos.QueryType queryType;
       type = type.toLowerCase();
       switch(type) {
         case "sql":
           queryType = UserProtos.QueryType.SQL;
+          queries = plan.split(";");
           break;
         case "logical":
           queryType = UserProtos.QueryType.LOGICAL;
+          queries = new String[]{ plan };
           break;
         case "physical":
           queryType = UserProtos.QueryType.PHYSICAL;
+          queries = new String[]{ plan };
           break;
         default:
           System.out.println("Invalid query type: " + type);
           return -1;
       }
+      Format outputFormat;
+      format = format.toLowerCase();
+      switch(format) {
+        case "csv":
+          outputFormat = Format.CSV;
+          break;
+        case "tsv":
+          outputFormat = Format.TSV;
+          break;
+        case "table":
+          outputFormat = Format.TABLE;
+          break;
+        default:
+          System.out.println("Invalid format type: " + format);
+          return -1;
+      }
       Stopwatch watch = new Stopwatch();
-      watch.start();
-      client.runQuery(queryType, plan, listener);
-      int rows = listener.await();
-      System.out.println(String.format("Got %d record%s in %f seconds", rows, 
rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / 
(float)1000));
+      for (String query : queries) {
+        listener = new QueryResultsListener(outputFormat);
+        watch.start();
+        client.runQuery(queryType, query, listener);
+        int rows = listener.await();
+        System.out.println(String.format("%d record%s selected (%f seconds)", 
rows, rows > 1 ? "s" : "", (float) watch.elapsed(TimeUnit.MILLISECONDS) / 
(float) 1000));
+        if (query != queries[queries.length - 1]) {
+          System.out.println();
+        }
+        watch.stop();
+        watch.reset();
+      }
       return 0;
     }finally{
       if(client != null) client.close();
@@ -139,7 +189,11 @@ public class QuerySubmitter {
     AtomicInteger count = new AtomicInteger();
     private CountDownLatch latch = new CountDownLatch(1);
     RecordBatchLoader loader = new RecordBatchLoader(new 
BootStrapContext(DrillConfig.create()).getAllocator());
-    int width;
+    Format format;
+
+    public QueryResultsListener(Format format) {
+      this.format = format;
+    }
 
     @Override
     public void submissionFailed(RpcException ex) {
@@ -157,12 +211,21 @@ public class QuerySubmitter {
         } catch (SchemaChangeException e) {
           submissionFailed(new RpcException(e));
         }
-        
-        VectorUtil.showVectorAccessibleContent(loader);
+
+        switch(format) {
+          case TABLE:
+            VectorUtil.showVectorAccessibleContent(loader);
+            break;
+          case TSV:
+            VectorUtil.showVectorAccessibleContent(loader, "\t");
+            break;
+          case CSV:
+            VectorUtil.showVectorAccessibleContent(loader, ",");
+            break;
+        }
       }
       
       if (result.getHeader().getIsLastChunk()) {
-        //System.out.println(StringUtils.repeat("-", width*17 + 1));
         latch.countDown();
       }
       result.release();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ed7a678/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index 553e50d..a35bb5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -29,6 +29,40 @@ import com.beust.jcommander.internal.Lists;
 
 public class VectorUtil {
 
+  public static void showVectorAccessibleContent(VectorAccessible va, final 
String delimiter) {
+
+    int rows = va.getRecordCount();
+    List<String> columns = Lists.newArrayList();
+    for (VectorWrapper vw : va) {
+      columns.add(vw.getValueVector().getField().getName());
+    }
+
+    int width = columns.size();
+    for (String column : columns) {
+      System.out.printf("%s%s",column, column == columns.get(width - 1) ? "\n" 
: delimiter);
+    }
+    for (int row = 0; row < rows; row++) {
+      int columnCounter = 0;
+      for (VectorWrapper vw : va) {
+        boolean lastColumn = columnCounter == width - 1;
+        Object o = vw.getValueVector().getAccessor().getObject(row);
+        if (o == null) {
+          //null value
+          String value = "null";
+          System.out.printf("%s%s", value, lastColumn ? "\n" : delimiter);
+        }
+        else if (o instanceof byte[]) {
+          String value = new String((byte[]) o);
+          System.out.printf("%s%s", value, lastColumn ? "\n" : delimiter);
+        } else {
+          String value = o.toString();
+          System.out.printf("%s%s", value, lastColumn ? "\n" : delimiter);
+        }
+        columnCounter++;
+      }
+    }
+  }
+
   public static void showVectorAccessibleContent(VectorAccessible va) {
 
     int rows = va.getRecordCount();

Reply via email to