flatten first pass: out a new output field doing everything but splitting out arrays into multiple records
multiplying, but not cloning properly deep copy of data values, working flatten cleanup cleanup drop working Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9d5b0ef8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9d5b0ef8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9d5b0ef8 Branch: refs/heads/master Commit: 9d5b0ef86a8bf3b21d0d8e888949a5ebda8a4984 Parents: 32fc527 Author: Chris Merrick <[email protected]> Authored: Fri Feb 1 18:55:23 2013 -0500 Committer: Jacques Nadeau <[email protected]> Committed: Sun Feb 17 10:26:29 2013 -0800 ---------------------------------------------------------------------- .../apache/drill/common/logical/data/Flatten.java | 7 +- .../org/apache/drill/exec/ref/RecordPointer.java | 1 + .../org/apache/drill/exec/ref/UnbackedRecord.java | 7 +- .../org/apache/drill/exec/ref/rops/FlattenROP.java | 143 +++++++++++++++ .../drill/exec/ref/rops/ProxySimpleRecord.java | 5 + .../drill/exec/ref/values/BaseArrayValue.java | 2 +- .../drill/exec/ref/values/BaseDataValue.java | 8 +- .../apache/drill/exec/ref/values/BaseMapValue.java | 19 ++ .../apache/drill/exec/ref/values/DataValue.java | 2 + .../apache/drill/exec/ref/values/ScalarValues.java | 44 ++++- .../drill/exec/ref/values/SimpleArrayValue.java | 14 +- .../drill/exec/ref/values/SimpleMapValue.java | 12 ++ .../src/test/resources/simple_plan_flattened.json | 54 ++++++ 13 files changed, 300 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java index e66835b..9c8b060 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java @@ -29,12 +29,7 @@ public class Flatten extends SingleInputOperator{ private final FieldReference name; private final LogicalExpression expr; private final boolean drop; - - @JsonCreator - public Flatten(@JsonProperty("name") FieldReference name, @JsonProperty("expr") LogicalExpression expr) { - this(name, expr, true); - } - + @JsonCreator public Flatten(@JsonProperty("name") FieldReference name, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("drop") boolean drop) { this.name = name; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java index 8ac54bd..b7c523b 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/RecordPointer.java @@ -12,6 +12,7 @@ public interface RecordPointer { public DataValue getField(SchemaPath field); public void addField(SchemaPath field, DataValue value); public void addField(PathSegment segment, DataValue value); + public void removeField(SchemaPath segment); public void write(DataWriter writer) throws IOException; public RecordPointer copy(); public void copyFrom(RecordPointer r); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java index ee658f6..ada191d 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java @@ -44,6 +44,11 @@ public class UnbackedRecord implements RecordPointer{ public void addField(PathSegment segment, DataValue value) { root.addValue(segment, value); } + + @Override + public void removeField(SchemaPath field) { + root.removeValue(field.getRootSegment()); + } @Override public void write(DataWriter writer) throws IOException { @@ -80,7 +85,7 @@ public class UnbackedRecord implements RecordPointer{ @Override public void copyFrom(RecordPointer r) { if(r instanceof UnbackedRecord){ - this.root = ((UnbackedRecord)r).root; + this.root = ((UnbackedRecord)r).root.copy(); }else{ throw new UnsupportedOperationException(String.format("Unable to copy from a record of type %s to an UnbackedRecord.", r.getClass().getCanonicalName())); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java new file mode 100644 index 0000000..64eca8b --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java @@ -0,0 +1,143 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.rops; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.types.DataType; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.logical.data.Flatten; +import org.apache.drill.exec.ref.RecordIterator; +import org.apache.drill.exec.ref.RecordPointer; +import org.apache.drill.exec.ref.UnbackedRecord; +import org.apache.drill.exec.ref.eval.EvaluatorFactory; +import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator; +import org.apache.drill.exec.ref.values.BaseArrayValue; +import org.apache.drill.exec.ref.values.DataValue; +import org.apache.drill.exec.ref.values.SimpleArrayValue; + + +public class FlattenROP extends SingleInputROPBase<Flatten> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenROP.class); + + private RecordPointer outputRecord = new UnbackedRecord(); + private BasicEvaluator evaluator; + private RecordIterator iter; + + public FlattenROP(Flatten config) { + super(config); + } + + @Override + protected void setInput(RecordIterator incoming) { + this.iter = new FlattenIterator(incoming); + } + + @Override + protected RecordIterator getIteratorInternal() { + return iter; + } + + @Override + protected void setupEvals(EvaluatorFactory builder) { + evaluator = builder.getBasicEvaluator(record, config.getExpr()); + } + + private class ArrayValueIterator { + private BaseArrayValue arrayValue; + private int currentIndex = 0; + + public ArrayValueIterator(BaseArrayValue arrayValue) { + this.arrayValue = arrayValue; + } + + public ArrayValueIterator() { + this(new SimpleArrayValue()); + } + + public DataValue next() { + DataValue v = null; + if (currentIndex < arrayValue.size()) { + v = arrayValue.getByArrayIndex(currentIndex); + } + + currentIndex++; + return v; + + } + } + + private class FlattenIterator implements RecordIterator { + RecordIterator incoming; + NextOutcome currentOutcome; + int currentIndex = 0; + ArrayValueIterator arrayValueIterator = new ArrayValueIterator(); + + public FlattenIterator(RecordIterator incoming) { + super(); + this.incoming = incoming; + } + + @Override + public RecordPointer getRecordPointer() { + return outputRecord; + } + + @Override + public NextOutcome next() { + DataValue v; + if ((v = arrayValueIterator.next()) != null) //if we are already iterating through a sub-array, keep going + return mergeValue(v); + else //otherwise, get the next record + currentOutcome = incoming.next(); + + + if (currentOutcome != NextOutcome.NONE_LEFT) { + if (evaluator.eval().getDataType() == DataType.ARRAY) { + arrayValueIterator = new ArrayValueIterator(evaluator.eval().getAsContainer().getAsArray()); + + while ((v = arrayValueIterator.next()) != null) { + return mergeValue(v); + } + } else { + outputRecord.copyFrom(record); + outputRecord.addField(config.getName(), evaluator.eval()); + if(config.isDrop()) + outputRecord.removeField((SchemaPath)config.getExpr()); + } + } + return currentOutcome; + } + + // helper function to merge one of the values from a sub array into the parent record + private NextOutcome mergeValue(DataValue v) { + outputRecord.copyFrom(record); + outputRecord.addField(config.getName(), v); + if(config.isDrop()) + outputRecord.removeField((SchemaPath)config.getExpr()); + return NextOutcome.INCREMENTED_SCHEMA_CHANGED; + } + + @Override + public ROP getParent() { + return FlattenROP.this; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java index 5aea366..0671587 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java @@ -46,6 +46,11 @@ public class ProxySimpleRecord implements RecordPointer{ } @Override + public void removeField(SchemaPath field) { + record.removeField(field); + } + + @Override public void write(DataWriter writer) throws IOException { record.write(writer); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java index 9b14e63..503c835 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java @@ -36,7 +36,7 @@ public abstract class BaseArrayValue extends BaseDataValue implements ContainerV } protected abstract void addToArray(int index, DataValue v); - protected abstract DataValue getByArrayIndex(int index); + public abstract DataValue getByArrayIndex(int index); protected abstract int getNextIndex(); public abstract void append(BaseArrayValue container); public abstract int size(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java index ac4cde9..971e8ce 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java @@ -36,6 +36,10 @@ public abstract class BaseDataValue implements DataValue{ throw new IllegalArgumentException("You can't add a value to a non-container type."); } + public void removeValue(PathSegment segment) { + throw new IllegalArgumentException("You can't remove a value from a non-container type."); + } + @Override public NumericValue getAsNumeric() { throw new DrillRuntimeException(String.format("A %s value is not a NumericValue.", this.getClass().getCanonicalName())); @@ -61,6 +65,8 @@ public abstract class BaseDataValue implements DataValue{ @Override public abstract int hashCode(); - + + @Override + public abstract DataValue copy(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java index fd6a968..4fa7a51 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java @@ -49,10 +49,29 @@ public abstract class BaseMapValue extends BaseDataValue implements ContainerVal } + @Override + public void removeValue(PathSegment segment) { + if (segment.isArray()) + throw new RecordException( + "You're attempted to remove something at a particular array location while the location of that setting was a Map.", null); + + CharSequence name = segment.getNameSegment().getPath(); + DataValue current = getByNameNoNulls(name); + if (!segment.isLastPath() && current != DataValue.NULL_VALUE) { + current.removeValue(segment.getChild()); + return; + } else { + removeByName(name); + } + + } + protected abstract void setByName(CharSequence name, DataValue v); protected abstract DataValue getByName(CharSequence name); + protected abstract void removeByName(CharSequence name); + private DataValue getByNameNoNulls(CharSequence name) { DataValue v = getByName(name); if (v == null) return NULL_VALUE; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java index b78b8a1..9e40014 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java @@ -13,6 +13,7 @@ public interface DataValue { public DataValue getValue(PathSegment segment); public void addValue(PathSegment segment, DataValue v); + public void removeValue(PathSegment segment); public void write(DataWriter writer) throws IOException; public DataType getDataType(); public NumericValue getAsNumeric(); @@ -22,4 +23,5 @@ public interface DataValue { public BytesValue getAsBytesValue(); public boolean equals(DataValue v); public int hashCode(); + public DataValue copy(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java index 56e0b9d..2a5c1de 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java @@ -107,6 +107,11 @@ public final class ScalarValues { public int hashCode() { return seq.hashCode(); } + + @Override + public DataValue copy() { + return new StringScalar(seq.toString()); + } } @@ -163,6 +168,11 @@ public final class ScalarValues { public int hashCode() { return b ? 1 : 0; } + + @Override + public DataValue copy() { + return new BooleanScalar(b); + } } public static class LongScalar extends NumericValue{ @@ -227,6 +237,10 @@ public final class ScalarValues { return getHashCode(l); } + @Override + public DataValue copy() { + return new LongScalar(l); + } } public static class IntegerScalar extends NumericValue{ @@ -296,7 +310,11 @@ public final class ScalarValues { public int hashCode() { return getHashCode(i); } - + + @Override + public DataValue copy() { + return new IntegerScalar(i); + } } @@ -346,6 +364,11 @@ public final class ScalarValues { public int hashCode() { return getHashCode(f); } + + @Override + public DataValue copy() { + return new FloatScalar(f); + } } @@ -397,6 +420,10 @@ public final class ScalarValues { return getHashCode(d); } + @Override + public DataValue copy() { + return new DoubleScalar(d); + } } public static class BytesScalar extends BaseDataValue implements BytesValue{ @@ -466,6 +493,13 @@ public final class ScalarValues { public int hashCode() { return HASH.hashBytes(bytes).asInt(); } + + @Override + public DataValue copy() { + byte[] out = Arrays.copyOf(bytes, bytes.length); + return new BytesScalar(out); + } + } @@ -497,9 +531,11 @@ public final class ScalarValues { public int hashCode() { return 0; } - - - + + @Override + public DataValue copy() { + return new NullValue(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleArrayValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleArrayValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleArrayValue.java index d6adefb..a8c11de 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleArrayValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleArrayValue.java @@ -53,7 +53,7 @@ public class SimpleArrayValue extends BaseArrayValue{ } @Override - protected DataValue getByArrayIndex(int index) { + public DataValue getByArrayIndex(int index) { if(index < items.length){ DataValue ret = items[index]; if(ret == null) return NULL_VALUE; @@ -135,9 +135,13 @@ public class SimpleArrayValue extends BaseArrayValue{ return Objects.hash((Object[]) items); } - - - - + @Override + public DataValue copy() { + SimpleArrayValue out = new SimpleArrayValue(this.size()); + for(int i =0; i < this.size(); i++) + out.addToArray(i, this.getByArrayIndex(i)); + + return out; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java index 1c596e4..e16e8c1 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java @@ -45,6 +45,11 @@ public class SimpleMapValue extends BaseMapValue{ } @Override + protected void removeByName(CharSequence name) { + map.remove(name); + } + + @Override public void write(DataWriter w) throws IOException { w.writeMapStart(); @@ -94,4 +99,11 @@ public class SimpleMapValue extends BaseMapValue{ return map.hashCode(); } + public DataValue copy() { + SimpleMapValue out = new SimpleMapValue(); + for(Map.Entry<CharSequence, DataValue> entry : map.entrySet()) { + out.setByName(entry.getKey().toString(), entry.getValue().copy()); + } + return out; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9d5b0ef8/sandbox/prototype/exec/ref/src/test/resources/simple_plan_flattened.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/resources/simple_plan_flattened.json b/sandbox/prototype/exec/ref/src/test/resources/simple_plan_flattened.json new file mode 100644 index 0000000..7e38b4e --- /dev/null +++ b/sandbox/prototype/exec/ref/src/test/resources/simple_plan_flattened.json @@ -0,0 +1,54 @@ +{ + head:{ + type:"apache_drill_logical_plan", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + sources:[ + { + type:"json", + name:"donuts-json", + files:[ + "src/test/resources/donuts.json" + ] + } + ], + query:[ + { + op:"sequence", + do:[ + { + op: "scan", + memo: "initial_scan", + ref: "donuts", + source: "donuts-json", + selection: {data: "activity"} + }, + { + op: "transform", + transforms: [ + { ref: "donuts.quantity", expr: "donuts.sales"} + ] + }, + { + op: "filter", + expr: "donuts.ppu < 1.00" + }, + { + op: "flatten", + expr: "donuts.topping", + name: "donuts.t1", + drop: 1 + }, + { + op: "write", + memo: "output sink", + file: "console:///stdout" + } + ] + } + ] +}
