DRILL-5 - First "working" version of Explode/Implode
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/be2ce8d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/be2ce8d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/be2ce8d2 Branch: refs/heads/master Commit: be2ce8d2d404048304dc1c4392319380d3f00cb4 Parents: 62cebad Author: tdunning <[email protected]> Authored: Fri Oct 19 17:25:17 2012 -0700 Committer: tdunning <[email protected]> Committed: Fri Oct 19 17:25:17 2012 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/drill/plan/ParsePlan.java | 43 +++++-- .../org/apache/drill/plan/PhysicalInterpreter.java | 85 ++++++++++++-- .../plan/physical/operators/ArithmeticOp.java | 7 +- .../plan/physical/operators/BatchListener.java | 11 ++ .../apache/drill/plan/physical/operators/Bind.java | 6 +- .../plan/physical/operators/EvalOperator.java | 13 ++ .../drill/plan/physical/operators/Explode.java | 61 +++++++++- .../drill/plan/physical/operators/Implode.java | 63 +++++++++++ .../drill/plan/physical/operators/JsonSchema.java | 87 +++++++++++++++ .../drill/plan/physical/operators/Operator.java | 75 +++++++++---- .../drill/plan/physical/operators/ScanJson.java | 35 +----- .../drill/plan/physical/operators/Schema.java | 10 ++- .../java/org/apache/drill/plan/ParsePlanTest.java | 12 +- .../apache/drill/plan/PhysicalInterpreterTest.java | 18 ++- sandbox/plan-parser/src/test/resources/data1.json | 1 + sandbox/plan-parser/src/test/resources/data2.json | 2 + .../src/test/resources/physical-2.drillx | 7 + 17 files changed, 431 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java index 781bc62..1d7230d 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java @@ -35,28 +35,37 @@ import java.util.Formatter; /** * Parses a plan from a resource or file. - * + * <p/> * The result is validated to ensure that symbols mentioned on the left-hand side of assignments are only mentioned * once and all referenced symbols on the right hand side are defined somewhere. */ public class ParsePlan { - public static Plan parseResource(File file) throws IOException, RecognitionException, ValidationException { + public static Plan parseResource(File file) throws ParseException { return ParsePlan.parse(Files.newReaderSupplier(file, Charsets.UTF_8)); } - public static Plan parseResource(String resourceName) throws IOException, RecognitionException, ValidationException { + public static Plan parseResource(String resourceName) throws ParseException { return ParsePlan.parse(Resources.newReaderSupplier(Resources.getResource(resourceName), Charsets.UTF_8)); } - public static Plan parse(InputSupplier<InputStreamReader> in) throws IOException, RecognitionException, ValidationException { - InputStreamReader inStream = in.getInput(); - PlanLexer lex = new PlanLexer(new ANTLRReaderStream(inStream)); - PlanParser r = new PlanParser(new CommonTokenStream(lex)); - inStream.close(); + public static Plan parse(InputSupplier<InputStreamReader> in) throws ParseException { + PlanParser r; + try { + InputStreamReader inStream = in.getInput(); + PlanLexer lex = new PlanLexer(new ANTLRReaderStream(inStream)); + r = new PlanParser(new CommonTokenStream(lex)); + inStream.close(); + } catch (IOException e) { + throw new ParseException(e); + } - Plan plan = r.plan().r; - validate(plan); - return plan; + try { + Plan plan = r.plan().r; + validate(plan); + return plan; + } catch (RecognitionException e) { + throw new ParseException(e); + } } private static void validate(Plan r) throws ValidationException { @@ -98,7 +107,17 @@ public class ParsePlan { } } - public static class ValidationException extends Exception { + public static class ParseException extends Exception { + public ParseException(Throwable throwable) { + super(throwable); + } + + public ParseException(String s) { + super(s); + } + } + + public static class ValidationException extends ParseException { public ValidationException(String s) { super(s); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java index 3058206..778ac90 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java @@ -17,6 +17,9 @@ package org.apache.drill.plan; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.drill.plan.ast.Arg; @@ -25,15 +28,14 @@ import org.apache.drill.plan.ast.Plan; import org.apache.drill.plan.physical.operators.DataListener; import org.apache.drill.plan.physical.operators.Operator; +import javax.annotation.Nullable; +import java.io.Closeable; import java.lang.reflect.InvocationTargetException; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; /** * Takes a physical plan and interprets in locally. The goal here is to provide a reference @@ -42,12 +44,22 @@ import java.util.concurrent.Future; public class PhysicalInterpreter implements DataListener { private final List<Operator> ops; - public PhysicalInterpreter(Plan prog) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public PhysicalInterpreter(Plan prog) throws SetupException { Map<Integer, Operator> bindings = Maps.newHashMap(); ops = Lists.newArrayList(); - for (Op op : prog.getStatements()) { - ops.add(Operator.create(op, bindings)); + try { + for (Op op : prog.getStatements()) { + ops.add(Operator.create(op, bindings)); + } + } catch (NoSuchMethodException e) { + throw new SetupException(e); + } catch (InvocationTargetException e) { + throw new SetupException(e); + } catch (IllegalAccessException e) { + throw new SetupException(e); + } catch (InstantiationException e) { + throw new SetupException(e); } Iterator<Op> i = prog.getStatements().iterator(); @@ -61,13 +73,46 @@ public class PhysicalInterpreter implements DataListener { } } - public void run() throws InterruptedException, ExecutionException { + public void run() throws QueryException { ExecutorService pool = Executors.newFixedThreadPool(ops.size()); - List<Future<Object>> tasks = pool.invokeAll(ops); + + // pick out the ops that are tasks + List<Callable<Object>> tasks = Lists.newArrayList(Iterables.transform(Iterables.filter(ops, new Predicate<Operator>() { + @Override + public boolean apply(@Nullable Operator operator) { + return operator instanceof Callable; + } + }), new Function<Operator, Callable<Object>>() { + @Override + public Callable<Object> apply(@Nullable Operator operator) { + // cast is safe due to previous filter + return (Callable<Object>) operator; + } + })); + + List<Future<Object>> results; + try { + results = pool.invokeAll(tasks); + } catch (InterruptedException e) { + throw new QueryException(e); + } pool.shutdown(); - for (Future<Object> task : tasks) { - System.out.printf("%s\n", task.get()); + for (Operator op : ops) { + if (op instanceof Closeable) { + op.close(); + } + } + + try { + Iterator<Callable<Object>> i = tasks.iterator(); + for (Future<Object> result : results) { + System.out.printf("%s => %s\n", i.next(), result.get()); + } + } catch (InterruptedException e) { + throw new QueryException(e); + } catch (ExecutionException e) { + throw new QueryException(e); } } @@ -76,4 +121,22 @@ public class PhysicalInterpreter implements DataListener { public void notify(Object r) { System.out.printf("out = %s\n", r); } + + public static class InterpreterException extends Exception { + private InterpreterException(Throwable throwable) { + super(throwable); + } + } + + public static class SetupException extends InterpreterException { + private SetupException(Throwable throwable) { + super(throwable); + } + } + + public static class QueryException extends InterpreterException { + private QueryException(Throwable throwable) { + super(throwable); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java index 516feff..fb65c10 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java @@ -40,10 +40,7 @@ public abstract class ArithmeticOp extends EvalOperator { public EvalOperator left, right; public ArithmeticOp(Op op, Map<Integer, Operator> bindings) { - checkArity(op, 2, 1); - - // bind our output - bindings.put(op.getOutputs().get(0).asSymbol().getInt(), this); + super(op, bindings, 2, 1); } @Override @@ -64,8 +61,6 @@ public abstract class ArithmeticOp extends EvalOperator { @Override public void link(Op op, Map<Integer, Operator> bindings) { - checkArity(op, 2, 1); - List<Arg> in = op.getInputs(); left = extractOperand(in.get(0), bindings); right = extractOperand(in.get(1), bindings); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java new file mode 100644 index 0000000..a103ffc --- /dev/null +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java @@ -0,0 +1,11 @@ +package org.apache.drill.plan.physical.operators; + +/** + * Aggregate and Implode operators need to know when batches of records are finished and thus + * they implement BatchListener. Note that the source of data is different from the source of + * batch boundaries. This avoids the need for every data processor to propagate boundaries but + * it also allows fancy structures to be constructed to do interesting things. + */ +public interface BatchListener { + public void endBatch(Object parent); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java index 329272e..4bd2b73 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java @@ -17,10 +17,8 @@ package org.apache.drill.plan.physical.operators; -import org.apache.drill.plan.ast.Arg; import org.apache.drill.plan.ast.Op; -import java.util.List; import java.util.Map; /** @@ -36,9 +34,7 @@ public class Bind extends EvalOperator { private String name; public Bind(Op op, Map<Integer, Operator> bindings) { - checkArity(op, 2, 1); - List<Arg> out = op.getOutputs(); - bindings.put(out.get(0).asSymbol().getInt(), this); + super(op, bindings, 2, 1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java index dfaea7b..d02cafe 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java @@ -17,10 +17,23 @@ package org.apache.drill.plan.physical.operators; +import org.apache.drill.plan.ast.Op; + +import java.util.Map; + /** * Describes a scalar expression. */ public abstract class EvalOperator extends Operator { + public EvalOperator(Op op, Map<Integer, Operator> bindings, int inputArgs, int outputArgs) { + super(op, bindings, inputArgs, outputArgs); + } + + // only for Constants + protected EvalOperator() { + super(); + } + public abstract Object eval(Object data); @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java index a083353..74bec49 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java @@ -1,11 +1,60 @@ package org.apache.drill.plan.physical.operators; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.util.List; +import java.util.Map; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/17/12 - * Time: 6:52 PM - * To change this template use File | Settings | File Templates. + * Explode creates a secondary data flow that contains a batch of records for + * each input record. Typically, Explode is paired with Implode which gathers + * the results back together or with Aggregate which gathers results in a different + * way. Cogroup, Explode, Aggregate gives a group by aggregate and + * Explode, Filter, Implode gives sub-tree filtering. + * + * Explode has two outputs. One is a data source and one is a batch controller (source of batch + * boundaries). Any downstream operator that delays emitting records should + * listen to the batch boundaries to avoid having aggregate outputs that cross batch + * boundaries. */ -public class Explode { +public class Explode extends Operator implements DataListener { + private String variableToExplode; + private Schema schema; + private Schema subSchema; + + public static void define() { + Operator.defineOperator("explode", Explode.class); + } + + public Explode(Op op, Map<Integer, Operator> bindings) { + // exploded-data-stream, batch-controller := explode data-in, variable-name-to-explode + super(op, bindings, 2, 1); + } + + @Override + public void link(Op op, Map<Integer, Operator> bindings) { + List<Arg> in = op.getInputs(); + Operator data = bindings.get(in.get(0).asSymbol().getInt()); + data.addDataListener(this); + schema = data.getSchema(); + + variableToExplode = in.get(1).asString(); + subSchema = schema.getSubSchema(variableToExplode); + } + + @Override + public void notify(Object row) { + // for each input we get, we iterate through our exploding variable + for (Object value : schema.getIterable(variableToExplode, row)) { + emit(value); + } + finishBatch(row); + } + + @Override + public Schema getSchema() { + return subSchema; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java new file mode 100644 index 0000000..15f0f5b --- /dev/null +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java @@ -0,0 +1,63 @@ +package org.apache.drill.plan.physical.operators; + +import com.google.common.collect.Lists; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.util.List; +import java.util.Map; + +/** + * Aggregate records into a list that is inserted into the parent record for the batch. + */ +public class Implode extends Operator implements DataListener, BatchListener { + private Operator data; + private List<Object> accumulator = Lists.newArrayList(); + private Schema schema ; + private String accumulatorVar; + + public static void define() { + Operator.defineOperator("implode", Implode.class); + } + + public Implode(Op op, Map<Integer, Operator> bindings) { + // data-out, implode-var := implode data-in, batch-master + super(op, bindings, 2, 2); + } + + @Override + public void link(Op op, Map<Integer, Operator> bindings) { + List<Arg> in = op.getInputs(); + data = bindings.get(in.get(0).asSymbol().getInt()); + data.addDataListener(this); + +// schema = data.getSchema().overlay(); + schema = new JsonSchema(); + Operator batchController = bindings.get(in.get(1).asSymbol().getInt()); + batchController.addBatchListener(this); + + accumulatorVar = Operator.gensym(); + } + + @Override + public Object eval() { + return accumulatorVar; + } + + @Override + public Schema getSchema() { + throw new UnsupportedOperationException("Default operation"); + } + + @Override + public void notify(Object r) { + accumulator.add(r); + } + + @Override + public void endBatch(Object parent) { + schema.set(accumulatorVar, parent, accumulator); + accumulator = Lists.newArrayList(); + emit(parent); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java new file mode 100644 index 0000000..19b750a --- /dev/null +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java @@ -0,0 +1,87 @@ +package org.apache.drill.plan.physical.operators; + +import com.google.common.base.Function; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.gson.*; + +import javax.annotation.Nullable; +import java.util.Collection; + +/** +* Created with IntelliJ IDEA. +* User: tdunning +* Date: 10/19/12 +* Time: 5:01 PM +* To change this template use File | Settings | File Templates. +*/ +public class JsonSchema extends Schema { + Splitter onDot = Splitter.on("."); + + @Override + public Object get(String name, Object data) { + JsonElement r = (JsonElement) data; + Iterable<String> bits = onDot.split(name); + for (String bit : bits) { + r = ((JsonObject) data).get(bit); + } + return cleanupJsonisms(r); + } + + @Override + public <T> Iterable<T> getIterable(String name, Object data) { + Object r = get(name, data); + if (r instanceof JsonArray) { + return Iterables.transform((JsonArray) r, new Function<JsonElement, T>() { + @Override + public T apply(@Nullable JsonElement jsonElement) { + return (T) cleanupJsonisms(jsonElement); + } + }); + } else { + throw new IllegalArgumentException("Looked for array but value was " + r.getClass()); + } + } + + @Override + public Schema getSubSchema(String name) { + return new JsonSchema(); + } + + @Override + public Schema overlay() { + return this; + } + + @Override + public void set(String name, Object parent, Object value) { + if (value instanceof Collection) { + // input is likely to have been collected by Implode + // TODO but what if something else built this? Do we need a general serialization framework? + JsonArray r = new JsonArray(); + for (Object v : ((Collection) value)) { + r.add((JsonElement) v); + } + ((JsonObject) parent).add(name, r); + } else if (value instanceof Number) { + ((JsonObject) parent).add(name, new JsonPrimitive((Number) value)); + } else { + throw new IllegalArgumentException(String.format("Can't convert a %s to JSON by magic", value.getClass())); + } + } + + private Object cleanupJsonisms(JsonElement data) { + if (data instanceof JsonPrimitive) { + JsonPrimitive v = (JsonPrimitive) data; + if (v.isNumber()) { + return v.getAsDouble(); + } else if (v.isString()) { + return v.getAsString(); + } else { + return v.getAsBoolean(); + } + } else { + return data; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java index d34ff9c..9c6c0cb 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java @@ -26,34 +26,48 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; /** * Implements a function for an operator on a single line of the physical plan. - * + * <p/> * The life cycle of an operator is * <nl> - * <li>The operator's constructor is defined using Operator.defineOperator</li> - * <li>The operator is constructed via Operator.create. It is expected that - * the operator will fill in references to it's own outputs into the DAG bindings</li> - * <li>The operator is linked by a call to its link() method. At this point, the - * operator can look at its arguments and resolve references to its inputs. - * This is when it should add itself as a data listener and when it should request - * any schema that it needs from upstream Operator's.</li> - * <li>The operator's run() method is called. Most operators should simply return at this - * point, but data sources should start calling emit with data records.</li> - * <li>The operator will be notified of incoming data. It should process this data - * and emit the result.</li> + * <li>The operator's constructor is defined using Operator.defineOperator</li> + * <li>The operator is constructed via Operator.create. It is expected that + * the operator will fill in references to it's own outputs into the DAG bindings</li> + * <li>The operator is linked by a call to its link() method. At this point, the + * operator can look at its arguments and resolve references to its inputs. + * This is when it should add itself as a data listener and when it should request + * any schema that it needs from upstream Operator's.</li> + * <li>The operator's run() method is called. Most operators should simply return at this + * point, but data sources should start calling emit with data records.</li> + * <li>The operator will be notified of incoming data. It should process this data + * and emit the result.</li> * </nl> */ -public abstract class Operator implements Callable<Object> { +public abstract class Operator { + private static AtomicInteger genCount = new AtomicInteger(0); private static final Map<String, Class<? extends Operator>> operatorMap = Maps.newHashMap(); + public Operator(Op op, Map<Integer, Operator> bindings, int inputArgs, int outputArgs) { + checkArity(op, inputArgs, outputArgs); + for (Arg arg : op.getOutputs()) { + bindings.put(arg.asSymbol().getInt(), this); + } + } + + // only for testing and constants + protected Operator() { + } + static { ArithmeticOp.define(); Bind.define(); Filter.define(); ScanJson.define(); + Explode.define(); + Implode.define(); } @@ -67,34 +81,53 @@ public abstract class Operator implements Callable<Object> { public static Operator create(Op op, Map<Integer, Operator> bindings) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException { Class<? extends Operator> c = operatorMap.get(op.getOp()); if (c == null) { - throw new IllegalArgumentException(String.format("No such operators as %s", op.getOp())); + throw new IllegalArgumentException(String.format("No such operator as %s", op.getOp())); } Constructor<? extends Operator> con = c.getConstructor(Op.class, Map.class); return con.newInstance(op, bindings); } - protected final List<DataListener> dataOut = Lists.newArrayList(); + + public static String gensym() { + return String.format("__sym-%d", genCount.incrementAndGet()); + } + + private final List<DataListener> dataOut = Lists.newArrayList(); + private List<BatchListener> batchOut = Lists.newArrayList(); public void addDataListener(DataListener listener) { this.dataOut.add(listener); } + public void addBatchListener(BatchListener listener) { + this.batchOut.add(listener); + } + protected void emit(Object r) { for (DataListener listener : dataOut) { listener.notify(r); } } - public double eval() { + protected void finishBatch(Object parent) { + for (BatchListener listener : batchOut) { + listener.endBatch(parent); + } + } + + public double evalAsDouble() { throw new UnsupportedOperationException("default no can do"); //To change body of created methods use File | Settings | File Templates. } + public Object eval() { + return null; + } + public abstract void link(Op op, Map<Integer, Operator> bindings); - public Object call() throws Exception { - // do nothing - return null; + public void close() { + // do nothing by default... over-ride for clever behavior } public abstract Schema getSchema(); @@ -107,7 +140,7 @@ public abstract class Operator implements Callable<Object> { List<Arg> out = op.getOutputs(); if (out.size() != outputArgs) { - throw new IllegalArgumentException("bind should have exactly one output"); + throw new IllegalArgumentException(String.format("Operator should have exactly %d outputs, not %d", outputArgs, out.size())); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java index 2e7155d..0b699c3 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java @@ -18,14 +18,10 @@ package org.apache.drill.plan.physical.operators; import com.google.common.base.Charsets; -import com.google.common.base.Splitter; import com.google.common.io.Files; import com.google.common.io.InputSupplier; import com.google.common.io.Resources; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonStreamParser; +import com.google.gson.*; import org.apache.drill.plan.ast.Arg; import org.apache.drill.plan.ast.Op; @@ -35,11 +31,12 @@ import java.io.InputStreamReader; import java.io.Reader; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; /** * Reads JSON formatted records from a file. */ -public class ScanJson extends Operator { +public class ScanJson extends Operator implements Callable<Object> { public static void define() { Operator.defineOperator("scan-json", ScanJson.class); @@ -90,6 +87,9 @@ public class ScanJson extends Operator { count++; } in.close(); + // TODO what should the parent record be at the top-level? + finishBatch(null); + return count; } @@ -98,27 +98,4 @@ public class ScanJson extends Operator { return new JsonSchema(); } - private class JsonSchema extends Schema { - Splitter onDot = Splitter.on("."); - - @Override - public Object get(String name, Object data) { - Iterable<String> bits = onDot.split(name); - for (String bit : bits) { - data = ((JsonObject) data).get(bit); - } - if (data instanceof JsonPrimitive) { - JsonPrimitive v = (JsonPrimitive) data; - if (v.isNumber()) { - return v.getAsDouble(); - } else if (v.isString()) { - return v.getAsString(); - } else { - return v.getAsBoolean(); - } - } else { - return data; - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java index e38a792..424c715 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java @@ -19,9 +19,17 @@ package org.apache.drill.plan.physical.operators; /** * Describes a schema. In this context a schema is what understands how to get data - * out of a record. For JSON, the schema is pretty dumb, but for other data types it + * into and out of a record. For JSON, the schema is pretty dumb, but for other data types it * could be quite clever. */ public abstract class Schema { public abstract Object get(String name, Object data); + + public abstract <T> Iterable<? extends T> getIterable(String name, Object data); + + public abstract Schema getSubSchema(String name); + + public abstract Schema overlay(); + + public abstract void set(String name, Object parent, Object value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java index 7bbbaf6..7c4d6c4 100644 --- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java +++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.Lists; import com.google.common.io.Resources; import org.antlr.runtime.ANTLRReaderStream; import org.antlr.runtime.MissingTokenException; -import org.antlr.runtime.RecognitionException; import org.antlr.runtime.Token; import org.apache.drill.plan.ast.LogicalPlanParseException; import org.apache.drill.plan.ast.Plan; @@ -38,25 +37,25 @@ import static junit.framework.Assert.*; public class ParsePlanTest { @Test - public void testParse1() throws IOException, RecognitionException, ParsePlan.ValidationException { + public void testParse1() throws ParsePlan.ParseException { Plan r = ParsePlan.parseResource("plan1.drillx"); assertEquals("Lines", 3, r.getStatements().size()); } @Test - public void testParse2() throws IOException, RecognitionException, ParsePlan.ValidationException { + public void testParse2() throws ParsePlan.ParseException { Plan r = ParsePlan.parseResource("plan2.drillx"); assertEquals("Lines", 6, r.getStatements().size()); } @Test - public void testParse3() throws IOException, RecognitionException, ParsePlan.ValidationException { + public void testParse3() throws ParsePlan.ParseException { Plan r = ParsePlan.parseResource("plan3.drillx"); assertEquals("Lines", 8, r.getStatements().size()); } @Test - public void testParseError1() throws IOException, RecognitionException, ParsePlan.ValidationException { + public void testParseError1() throws ParsePlan.ParseException { try { ParsePlan.parseResource("bad-plan1.drillx"); fail("Should have thrown exception"); @@ -67,7 +66,7 @@ public class ParsePlanTest { } @Test - public void testParseError2() throws IOException, RecognitionException, ParsePlan.ValidationException { + public void testParseError2() throws ParsePlan.ParseException { try { ParsePlan.parseResource("bad-plan2.drillx"); fail("Should have thrown exception"); @@ -80,7 +79,6 @@ public class ParsePlanTest { } - @Test public void testLexer() throws IOException { List<String> ref = Lists.newArrayList( http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java index 6d401c1..77a9e57 100644 --- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java +++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java @@ -17,18 +17,22 @@ package org.apache.drill.plan; -import org.antlr.runtime.RecognitionException; import org.apache.drill.plan.ast.Plan; import org.junit.Test; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.concurrent.ExecutionException; - public class PhysicalInterpreterTest { @Test - public void testTrivialPlan() throws ParsePlan.ValidationException, RecognitionException, IOException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException, InterruptedException, ExecutionException { - Plan p = ParsePlan.parseResource("physical-1.drillx"); + public void testTrivialPlan() throws PhysicalInterpreter.InterpreterException, ParsePlan.ParseException { + run("physical-1.drillx"); + } + + @Test + public void testExplodeFilter() throws PhysicalInterpreter.InterpreterException, ParsePlan.ParseException { + run("physical-2.drillx"); + } + + private void run(String name) throws ParsePlan.ParseException, PhysicalInterpreter.SetupException, PhysicalInterpreter.QueryException { + Plan p = ParsePlan.parseResource(name); PhysicalInterpreter pi = new PhysicalInterpreter(p); pi.run(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/data1.json ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/resources/data1.json b/sandbox/plan-parser/src/test/resources/data1.json index ddfaa50..faff8fd 100644 --- a/sandbox/plan-parser/src/test/resources/data1.json +++ b/sandbox/plan-parser/src/test/resources/data1.json @@ -2,3 +2,4 @@ {"a":2, "b":2} {"a":3, "b":2} {"a":4, "b":2} + http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/data2.json ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/resources/data2.json b/sandbox/plan-parser/src/test/resources/data2.json new file mode 100644 index 0000000..ba3f74d --- /dev/null +++ b/sandbox/plan-parser/src/test/resources/data2.json @@ -0,0 +1,2 @@ +{"x":[{"a":1, "b":2}, {"a":2, "b":2}, {"a":3, "b":2}, {"a":4, "b":2}], "y":[{"a":4, "b":2}]} +{"x":[{"a":5, "b":2}, {"a":4, "b":2}, {"a":3, "b":2}, {"a":2, "b":2}], "y":[{"a":5, "b":2}, {"a":4, "b":2}]} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/physical-2.drillx ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/resources/physical-2.drillx b/sandbox/plan-parser/src/test/resources/physical-2.drillx new file mode 100644 index 0000000..14924c7 --- /dev/null +++ b/sandbox/plan-parser/src/test/resources/physical-2.drillx @@ -0,0 +1,7 @@ +# sub-tree filtering +%1 := scan-json "resource:data2.json" +%2 := explode %1, "x" +%4 := bind "a", %2 +%5 := > %4, 3 +%6 := filter %5, %2 +%7,%8 := implode %6, %2
