Add "--stdout" option to ReferenceInterpreter.main, that allows execution of plans that write to stdout (via a queue) rather than a file.
Signed-off-by: Jacques Nadeau <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6db5f17e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6db5f17e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6db5f17e Branch: refs/heads/execwork Commit: 6db5f17e25387a15afade3a4dc1aebfd7a3f1850 Parents: 8af3381 Author: Julian Hyde <[email protected]> Authored: Sat May 18 13:29:59 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jun 6 11:06:42 2013 -0700 ---------------------------------------------------------------------- .../drill/exec/ref/ReferenceInterpreter.java | 60 ++++++++++++++- .../org/apache/drill/exec/ref/rse/QueueRSE.java | 8 +- 2 files changed, 61 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6db5f17e/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java index ac4c891..f90be4f 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ReferenceInterpreter.java @@ -20,6 +20,8 @@ package org.apache.drill.exec.ref; import java.io.File; import java.io.IOException; import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.logical.LogicalPlan; @@ -74,7 +76,16 @@ public class ReferenceInterpreter { public static void main(String[] args) throws Exception{ DrillConfig config = DrillConfig.create(); - final String jsonFile = args[0]; + int arg = 0; + final BlockingQueue<Object> queue; + if (arg < args.length && args[arg].equals("--stdout")) { + ++arg; + queue = new ArrayBlockingQueue<>(100); + config.setSinkQueues(0, queue); + } else { + queue = null; + } + final String jsonFile = args[arg]; final String planString; if (jsonFile.startsWith("inline:")) { planString = jsonFile.substring("inline:".length()); @@ -85,6 +96,47 @@ public class ReferenceInterpreter { IteratorRegistry ir = new IteratorRegistry(); ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config)); i.setup(); + final Object[] result = {null}; + final Thread thread; + if (queue != null) { + thread = new Thread( + new Runnable() { + @Override + public void run() { + try { + result[0] = run0(); + } catch (Throwable e) { + result[0] = e; + } + } + private boolean run0() throws IOException { + for (;;) { + try { + Object o = queue.take(); + if (o instanceof RunOutcome.OutcomeType) { + switch ((RunOutcome.OutcomeType) o) { + case SUCCESS: + return true; // end of data + case CANCELED: + throw new RuntimeException("canceled"); + case FAILED: + default: + throw new RuntimeException("failed"); + } + } else { + System.out.write((byte[]) o); + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } + } + } + }); + thread.start(); + } else { + thread = null; + } Collection<RunOutcome> outcomes = i.run(); for(RunOutcome outcome : outcomes){ @@ -93,9 +145,11 @@ public class ReferenceInterpreter { if(outcome.outcome == RunOutcome.OutcomeType.FAILED && outcome.exception != null){ outcome.exception.printStackTrace(); } - } - + if (thread != null) { + thread.join(); + System.out.println("Result: " + result[0]); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6db5f17e/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java index f8976b2..8f7a6a5 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java @@ -19,9 +19,7 @@ package org.apache.drill.exec.ref.rse; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import org.apache.drill.common.config.DrillConfig; @@ -35,6 +33,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import javax.validation.constraints.NotNull; + public class QueueRSE extends RSEBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueRSE.class); @@ -95,7 +95,7 @@ public class QueueRSE extends RSEBase { private final Queue<Object> queue; public QueueRecordRecorder(Queue<Object> queue) { - this.queue = queue; + this.queue = Objects.requireNonNull(queue); } @Override
