Add "--stdout" option to ReferenceInterpreter.main, that allows execution of 
plans that write to stdout (via a queue) rather than a file.

Signed-off-by: Jacques Nadeau <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6db5f17e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6db5f17e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6db5f17e

Branch: refs/heads/execwork
Commit: 6db5f17e25387a15afade3a4dc1aebfd7a3f1850
Parents: 8af3381
Author: Julian Hyde <[email protected]>
Authored: Sat May 18 13:29:59 2013 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu Jun 6 11:06:42 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/ref/ReferenceInterpreter.java       |   60 ++++++++++++++-
 .../org/apache/drill/exec/ref/rse/QueueRSE.java    |    8 +-
 2 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6db5f17e/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
index ac4c891..f90be4f 100644
--- 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.ref;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -74,7 +76,16 @@ public class ReferenceInterpreter {
   
   public static void main(String[] args) throws Exception{
     DrillConfig config = DrillConfig.create();
-    final String jsonFile = args[0];
+    int arg = 0;
+    final BlockingQueue<Object> queue;
+    if (arg < args.length && args[arg].equals("--stdout")) {
+      ++arg;
+      queue = new ArrayBlockingQueue<>(100);
+      config.setSinkQueues(0, queue);
+    } else {
+      queue = null;
+    }
+    final String jsonFile = args[arg];
     final String planString;
     if (jsonFile.startsWith("inline:")) {
       planString = jsonFile.substring("inline:".length());
@@ -85,6 +96,47 @@ public class ReferenceInterpreter {
     IteratorRegistry ir = new IteratorRegistry();
     ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new 
BasicEvaluatorFactory(ir), new RSERegistry(config));
     i.setup();
+    final Object[] result = {null};
+    final Thread thread;
+    if (queue != null) {
+      thread = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              try {
+                result[0] = run0();
+              } catch (Throwable e) {
+                result[0] = e;
+              }
+            }
+            private boolean run0() throws IOException {
+              for (;;) {
+                try {
+                  Object o = queue.take();
+                  if (o instanceof RunOutcome.OutcomeType) {
+                    switch ((RunOutcome.OutcomeType) o) {
+                    case SUCCESS:
+                      return true; // end of data
+                    case CANCELED:
+                      throw new RuntimeException("canceled");
+                    case FAILED:
+                    default:
+                      throw new RuntimeException("failed");
+                    }
+                  } else {
+                    System.out.write((byte[]) o);
+                  }
+                } catch (InterruptedException e) {
+                  Thread.interrupted();
+                  throw new RuntimeException(e);
+                }
+              }
+            }
+          });
+      thread.start();
+    } else {
+      thread = null;
+    }
     Collection<RunOutcome> outcomes = i.run();
     
     for(RunOutcome outcome : outcomes){
@@ -93,9 +145,11 @@ public class ReferenceInterpreter {
       if(outcome.outcome == RunOutcome.OutcomeType.FAILED && outcome.exception 
!= null){
         outcome.exception.printStackTrace();
       }
-      
     }
-     
+     if (thread != null) {
+       thread.join();
+       System.out.println("Result: " + result[0]);
+     }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6db5f17e/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
index f8976b2..8f7a6a5 100644
--- 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
@@ -19,9 +19,7 @@ package org.apache.drill.exec.ref.rse;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -35,6 +33,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+import javax.validation.constraints.NotNull;
+
 public class QueueRSE extends RSEBase {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(QueueRSE.class);
 
@@ -95,7 +95,7 @@ public class QueueRSE extends RSEBase {
     private final Queue<Object> queue;
     
     public QueueRecordRecorder(Queue<Object> queue) {
-      this.queue = queue;
+      this.queue = Objects.requireNonNull(queue);
     }
 
     @Override

Reply via email to