http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java new file mode 100644 index 0000000..3f86c98 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.rse; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.common.logical.data.Store; +import org.apache.drill.common.util.PathScanner; +import org.apache.drill.exec.ref.ExecRefConstants; +import org.apache.drill.exec.ref.RecordIterator; +import org.apache.drill.exec.ref.exceptions.MajorException; +import org.apache.drill.exec.ref.rops.ROP; + +import com.typesafe.config.Config; + +public abstract class RSEBase implements ReferenceStorageEngine{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSEBase.class); + + @Override + public boolean supportsRead() { + return false; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException { + throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName())); + } + + @Override + public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException { + throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName())); + } + + @Override + public RecordRecorder getWriter(Store store) throws IOException { + throw new UnsupportedOperationException(String.format("%s does not support writes.", this.getClass().getCanonicalName())); + } + + public static Class<?>[] getSubTypes(Config config){ + Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES)); + return engines.toArray(new Class<?>[engines.size()]); + } + + @SuppressWarnings("unchecked") + protected <T extends ReadEntry> T getReadEntry(Class<T> c, ReadEntry entry){ + if(!c.isAssignableFrom(entry.getClass())) throw new MajorException(String.format("Expected entry type was invalid. Expected entry of type %s but received type of %s.", c.getCanonicalName(), entry.getClass().getCanonicalName())); + return (T) entry; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java new file mode 100644 index 0000000..4266aac --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java @@ -0,0 +1,85 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.rse; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.logical.StorageEngineConfig; +import org.apache.drill.common.util.PathScanner; +import org.apache.drill.exec.ref.ExecRefConstants; +import org.apache.drill.exec.ref.exceptions.SetupException; + +import com.typesafe.config.Config; + +public class RSERegistry { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSERegistry.class); + + private Map<Object, Constructor<? extends ReferenceStorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends ReferenceStorageEngine>>(); + private Map<StorageEngineConfig, ReferenceStorageEngine> activeEngines = new HashMap<StorageEngineConfig, ReferenceStorageEngine>(); + private DrillConfig config; + + public RSERegistry(DrillConfig config){ + this.config = config; + setup(config); + } + + @SuppressWarnings("unchecked") + public void setup(DrillConfig config){ + Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES)); + logger.debug("Loading storage engines {}", engines); + for(Class<? extends ReferenceStorageEngine> engine: engines){ + int i =0; + for(Constructor<?> c : engine.getConstructors()){ + Class<?>[] params = c.getParameterTypes(); + if(params.length != 2 || params[1] == Config.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){ + logger.debug("Skipping ReferenceStorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, Config)]", c, engine); + continue; + } + availableEngines.put(params[0], (Constructor<? extends ReferenceStorageEngine>) c); + i++; + } + if(i == 0){ + logger.debug("Skipping registration of ReferenceStorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName()); + } + } + } + + public ReferenceStorageEngine getEngine(StorageEngineConfig engineConfig) throws SetupException{ + ReferenceStorageEngine engine = activeEngines.get(engineConfig); + if(engine != null) return engine; + Constructor<? extends ReferenceStorageEngine> c = availableEngines.get(engineConfig.getClass()); + if(c == null) throw new SetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig)); + try { + return c.newInstance(engineConfig, config); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e; + if(t instanceof SetupException) throw ((SetupException) t); + throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t); + } + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java new file mode 100644 index 0000000..b7840bc --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java @@ -0,0 +1,28 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.rse; + +import org.apache.drill.exec.ref.RecordIterator; + +public interface RecordReader { + + public abstract RecordIterator getIterator(); + public abstract void setup(); + public abstract void cleanup(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java new file mode 100644 index 0000000..9527b0b --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java @@ -0,0 +1,32 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.rse; + +import java.io.IOException; + +import org.apache.drill.exec.ref.RecordPointer; +import org.apache.drill.exec.ref.RunOutcome; + +public interface RecordRecorder { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRecorder.class); + + public void setup() throws IOException; + public long recordRecord(RecordPointer pointer) throws IOException; + public void finish(RunOutcome.OutcomeType outcome) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java new file mode 100644 index 0000000..41cba45 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.rse; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.common.logical.data.Store; +import org.apache.drill.exec.ref.rops.ROP; + + +public interface ReferenceStorageEngine { + public boolean supportsRead(); + public boolean supportsWrite(); + + public enum PartitionCapabilities { + NONE, HASH, SORTED; + } + + public enum MemoryFormat { + RECORD, FIELD; + } + + public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException; + public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException; + public RecordRecorder getWriter(Store store) throws IOException; + + public interface ReadEntry{} +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java index 971e8ce..6603851 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java @@ -67,6 +67,16 @@ public abstract class BaseDataValue implements DataValue{ public abstract int hashCode(); @Override + public boolean equals(Object obj) { + if(obj instanceof DataValue){ + return this.equals((DataValue) obj); + }else{ + return false; + } + } + + + @Override public abstract DataValue copy(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java new file mode 100644 index 0000000..19efcd4 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java @@ -0,0 +1,80 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref.values; + +import java.util.Arrays; + +import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator; + +public class DataValueSet { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataValueSet.class); + + private final DataValue[] values; + private final BasicEvaluator[] evaluators; + + public DataValueSet(BasicEvaluator... evaluators){ + this.values = new DataValue[evaluators.length]; + this.evaluators = evaluators; + } + + private DataValueSet(DataValue[] values){ + this.values = new DataValue[values.length]; + System.arraycopy(values, 0, this.values, 0, values.length); + this.evaluators = null; + } + + public void grabValues(){ + for(int i =0; i < values.length; i++){ + values[i] = evaluators[i].eval(); + } + } + + public void copyFrom(DataValueSet set){ + System.arraycopy(set.values, 0, values, 0, set.values.length); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(values); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + DataValueSet other = (DataValueSet) obj; + if (!Arrays.equals(values, other.values)) return false; + return true; + } + + @Override + public String toString() { + return "DataValueSet [values=" + Arrays.toString(values) + "]"; + } + + public DataValueSet cloneValuesOnly(){ + return new DataValueSet(this.values); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java index f690c67..88efb92 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java @@ -54,7 +54,7 @@ public abstract class NumericValue extends BaseDataValue implements ComparableVa case INT: return Integer.compare(this.getAsInt(), other.getAsInt()); case LONG: - return Long.compare(this.getAsInt(), other.getAsInt()); + return Long.compare(this.getAsLong(), other.getAsLong()); default: throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java index 2a5c1de..4df146c 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java @@ -414,7 +414,7 @@ public final class ScalarValues { public String toString() { return "DoubleScalar [d=" + d + "]"; } - + @Override public int hashCode() { return getHashCode(d); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf b/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf new file mode 100644 index 0000000..c3b50e8 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf @@ -0,0 +1,4 @@ +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. +// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. +drill.storage.packages += "org.apache.drill.exec.ref.rse" http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java new file mode 100644 index 0000000..e15c568 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ref; + +import static org.junit.Assert.*; + +import java.util.Collection; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory; +import org.apache.drill.exec.ref.rse.RSERegistry; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +public class RunSimplePlan{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RunSimplePlan.class); + + + @Test + public void parseSimplePlan() throws Exception{ + DrillConfig config = DrillConfig.create(); + LogicalPlan plan = LogicalPlan.parse(config, Files.toString(FileUtils.getResourceAsFile("/simple_plan.json"), Charsets.UTF_8)); + IteratorRegistry ir = new IteratorRegistry(); + ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config)); + i.setup(); + Collection<RunOutcome> outcomes = i.run(); + assertEquals(outcomes.size(), 1); + assertEquals(outcomes.iterator().next().records, 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json b/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json index 936c094..34297b4 100644 --- a/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json +++ b/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json @@ -7,13 +7,19 @@ info:"na" } }, - sources:[ + storage:[ { - type:"json", - name:"donuts-json", - files:[ - "src/test/resources/donuts.json" - ] + type:"console", + name:"console" + }, + { + type:"fs", + name:"fs1", + root:"file:///" + }, + { + type:"classpath", + name:"cp" } ], query:[ @@ -24,33 +30,35 @@ op: "scan", memo: "initial_scan", ref: "donuts", - source: "donuts-json", - selection: {data: "activity"} + storageengine: "cp", + selection: { + path: "/donuts.json", + type: "JSON" + } }, - { + { op: "transform", transforms: [ - { ref: "donuts.quantity", expr: "donuts.sales"} + { ref: "quantity", expr: "donuts.sales"} ] }, { op: "filter", expr: "donuts.ppu < 1.00" - }, + }, { - op: "group", - groupings: [ - { ref: "donuts.ppu", expr: "donuts.ppu" } - ] + op: "segment", + ref: "ppusegment", + exprs: ["donuts.ppu"] }, { - op: "aggregate", - type: "simple", - keys: ["donuts.ppu"], + op: "collapsingaggregate", + within: "ppusegment", + carryovers: ["donuts.ppu"], aggregations: [ { ref: "donuts.typeCount", expr: "count(1)" }, - { ref: "donuts.quantity", expr: "sum(donuts.sales)" }, - { ref: "donuts.sales", expr: "sum(donuts.ppu * donuts.sales)" } + { ref: "donuts.quantity", expr: "sum(quantity)" }, + { ref: "donuts.sales", expr: "sum(donuts.ppu * quantity)" } ] }, { @@ -58,7 +66,7 @@ orderings: [ {order: "desc", expr: "donuts.ppu" } ] - }, + }, { op: "project", projections: [ @@ -68,12 +76,13 @@ { op: "limit", first: 0, - last: 2 - }, + last: 100 + }, { - op: "write", + op: "store", memo: "output sink", - file: "console:///stdout" + storageengine: "console", + target: {pipe: "STD_OUT"} } ] } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/pom.xml ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/pom.xml b/sandbox/prototype/sqlparser/pom.xml index 733fb1b..fabc44d 100644 --- a/sandbox/prototype/sqlparser/pom.xml +++ b/sandbox/prototype/sqlparser/pom.xml @@ -37,6 +37,14 @@ <version>1.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>ref</artifactId> + <version>1.0-SNAPSHOT</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.6.1</version> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java index eae7fdb..f530ea8 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java @@ -17,15 +17,23 @@ ******************************************************************************/ package org.apache.drill.jdbc; +import java.lang.reflect.Type; +import java.util.Collections; + import net.hydromatic.linq4j.BaseQueryable; import net.hydromatic.linq4j.Enumerator; import net.hydromatic.linq4j.Linq4j; import net.hydromatic.linq4j.expressions.Expression; import net.hydromatic.linq4j.expressions.Expressions; import net.hydromatic.linq4j.expressions.MethodCallExpression; -import net.hydromatic.optiq.*; +import net.hydromatic.optiq.BuiltinMethod; +import net.hydromatic.optiq.DataContext; +import net.hydromatic.optiq.MutableSchema; +import net.hydromatic.optiq.Schema; +import net.hydromatic.optiq.TranslatableTable; -import org.apache.drill.common.logical.sources.DataSource; +import org.apache.drill.common.logical.StorageEngineConfig; +import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig; import org.apache.drill.optiq.DrillOptiq; import org.apache.drill.optiq.DrillScan; import org.eigenbase.rel.RelNode; @@ -34,9 +42,6 @@ import org.eigenbase.reltype.RelDataType; import org.eigenbase.reltype.RelDataTypeFactory; import org.eigenbase.sql.type.SqlTypeName; -import java.lang.reflect.Type; -import java.util.Collections; - /** Optiq Table used by Drill. */ public class DrillTable extends BaseQueryable<Object> implements TranslatableTable<Object> @@ -44,7 +49,8 @@ public class DrillTable extends BaseQueryable<Object> private final Schema schema; private final String name; private final RelDataType rowType; - public final DataSource dataSource; + public final StorageEngineConfig storageEngineConfig; + public final Object selection; /** Creates a DrillTable. */ public DrillTable(Schema schema, @@ -52,16 +58,19 @@ public class DrillTable extends BaseQueryable<Object> Expression expression, RelDataType rowType, String name, - DataSource dataSource) { + StorageEngineConfig storageEngineConfig, + Object selection + ) { super(schema.getQueryProvider(), elementType, expression); this.schema = schema; this.name = name; this.rowType = rowType; - this.dataSource = dataSource; + this.storageEngineConfig = storageEngineConfig; + this.selection = selection; } static void addTable(RelDataTypeFactory typeFactory, MutableSchema schema, - String name, DataSource dataSource) { + String name, StorageEngineConfig storageEngineConfig, Object selection) { final MethodCallExpression call = Expressions.call(schema.getExpression(), BuiltinMethod.DATA_CONTEXT_GET_TABLE.method, Expressions.constant(name), @@ -72,7 +81,7 @@ public class DrillTable extends BaseQueryable<Object> typeFactory.createSqlType(SqlTypeName.ANY)), Collections.singletonList("_extra")); final DrillTable table = - new DrillTable(schema, Object.class, call, rowType, name, dataSource); + new DrillTable(schema, Object.class, call, rowType, name, storageEngineConfig, selection); schema.addTable(name, table); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java index 73cdcd8..0a0594c 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java @@ -17,14 +17,21 @@ ******************************************************************************/ package org.apache.drill.jdbc; -import net.hydromatic.optiq.*; -import net.hydromatic.optiq.impl.java.MapSchema; -import net.hydromatic.optiq.jdbc.*; -import org.apache.drill.common.logical.sources.JSONDataSource; - import java.sql.SQLException; import java.util.Collections; +import net.hydromatic.optiq.MutableSchema; +import net.hydromatic.optiq.impl.java.MapSchema; +import net.hydromatic.optiq.jdbc.DriverVersion; +import net.hydromatic.optiq.jdbc.Handler; +import net.hydromatic.optiq.jdbc.HandlerImpl; +import net.hydromatic.optiq.jdbc.OptiqConnection; +import net.hydromatic.optiq.jdbc.UnregisteredDriver; + +import org.apache.drill.exec.ref.rops.DataWriter.ConverterType; +import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig; +import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathRSEConfig; + /** * JDBC driver for Apache Drill. */ @@ -66,11 +73,12 @@ public class Driver extends UnregisteredDriver { MapSchema.create(connection, rootSchema, schemaName); connection.setSchema(schemaName); + final ClasspathRSEConfig rseConfig = new ClasspathRSEConfig("donuts-json"); + final ClasspathInputConfig inputConfig = new ClasspathInputConfig(); + inputConfig.path = "/donuts.json"; + inputConfig.type = ConverterType.JSON; + - final JSONDataSource dataSource = new JSONDataSource(); - dataSource.name = "donuts-json"; - dataSource.files = - Collections.singletonList("src/test/resources/donuts.json"); // "tables" is a temporary parameter. We should replace with // "schemaUri", which is the URI of a schema.json file, or the name of a @@ -82,7 +90,7 @@ public class Driver extends UnregisteredDriver { final String[] tables2 = tables.split(","); for (String table : tables2) { DrillTable.addTable(connection.getTypeFactory(), schema, table, - dataSource); + rseConfig, inputConfig); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java index f4ca721..445b118 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java @@ -17,7 +17,10 @@ ******************************************************************************/ package org.apache.drill.optiq; +import org.apache.drill.exec.ref.rse.QueueRSE.QueueOutputInfo; + import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -51,15 +54,24 @@ public class DrillImplementor { // TODO: populate sources based on the sources of scans that occur in // the query final ArrayNode sourcesNode = mapper.createArrayNode(); - rootNode.put("sources", sourcesNode); - final ObjectNode sourceNode = mapper.createObjectNode(); - sourcesNode.add(sourceNode); - sourceNode.put("type", "json"); - sourceNode.put("name", "donuts-json"); - final ArrayNode filesNode = mapper.createArrayNode(); - sourceNode.put("files", filesNode); - filesNode.add("src/test/resources/donuts.json"); + rootNode.put("storage", sourcesNode); + + // input file source + { + final ObjectNode sourceNode = mapper.createObjectNode(); + sourceNode.put("name", "donuts-json"); + sourceNode.put("type", "classpath"); + sourcesNode.add(sourceNode); + } + { + final ObjectNode sourceNode = mapper.createObjectNode(); + sourceNode.put("name", "queue"); + sourceNode.put("type", "queue"); + sourcesNode.add(sourceNode); + } + + final ArrayNode queryNode = mapper.createArrayNode(); rootNode.put("query", queryNode); @@ -76,9 +88,12 @@ public class DrillImplementor { // Add a last node, to write to the output queue. final ObjectNode writeOp = mapper.createObjectNode(); - writeOp.put("op", "write"); + writeOp.put("op", "store"); + writeOp.put("storageengine", "queue"); writeOp.put("memo", "output sink"); - writeOp.put("file", "socket:0"); + QueueOutputInfo output = new QueueOutputInfo(); + output.number = 0; + writeOp.put("target", mapper.convertValue(output, JsonNode.class)); add(writeOp); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java index 87a32ad..c18baaf 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java @@ -1,14 +1,15 @@ package org.apache.drill.optiq; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.drill.jdbc.DrillTable; - import org.eigenbase.rel.TableAccessRelBase; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelOptTable; import org.eigenbase.relopt.RelTraitSet; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + /** * Scan of a Drill table. */ @@ -37,7 +38,8 @@ public class DrillScan extends TableAccessRelBase implements DrillRel { node.put("op", "scan"); node.put("memo", "initial_scan"); node.put("ref", "donuts"); - node.put("source", drillTable.dataSource.getName()); + node.put("storageengine", drillTable.storageEngineConfig.getName()); + node.put("selection", implementor.mapper.convertValue(drillTable.selection, JsonNode.class)); implementor.add(node); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java index 15fe06d..016a386 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java @@ -17,24 +17,42 @@ ******************************************************************************/ package org.apache.drill.optiq; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import net.hydromatic.linq4j.AbstractEnumerable; import net.hydromatic.linq4j.Enumerable; import net.hydromatic.linq4j.Enumerator; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.exec.ref.IteratorRegistry; import org.apache.drill.exec.ref.ReferenceInterpreter; import org.apache.drill.exec.ref.RunOutcome; import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory; +import org.apache.drill.exec.ref.rse.RSERegistry; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * Runtime helper that executes a Drill query and converts it into an @@ -44,8 +62,9 @@ public class EnumerableDrill<E> extends AbstractEnumerable<E> implements Enumerable<E> { private final LogicalPlan plan; - final BlockingQueue queue = new ArrayBlockingQueue(100); - + final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100); + final DrillConfig config; + private static final ObjectMapper mapper = createMapper(); /** Creates a DrillEnumerable. @@ -53,23 +72,26 @@ public class EnumerableDrill<E> * @param plan Logical plan * @param clazz Type of elements returned from enumerable */ - public EnumerableDrill(LogicalPlan plan, Class<E> clazz) { + public EnumerableDrill(DrillConfig config, LogicalPlan plan, Class<E> clazz) { this.plan = plan; + this.config = config; + config.setSinkQueues(0, queue); } /** Creates a DrillEnumerable from a plan represented as a string. */ public static <E extends JsonNode> EnumerableDrill<E> of(String plan, Class<E> clazz) { - return new EnumerableDrill<E>(LogicalPlan.parse(plan), clazz); + DrillConfig config = DrillConfig.create(); + return new EnumerableDrill<E>(config, LogicalPlan.parse(config, plan), clazz); } /** Runs the plan as a background task. */ Future<Collection<RunOutcome>> runPlan( CompletionService<Collection<RunOutcome>> service) { IteratorRegistry ir = new IteratorRegistry(); - - final ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, - new BasicEvaluatorFactory(ir), Collections.singletonList((Queue) queue)); + DrillConfig config = DrillConfig.create(); + config.setSinkQueues(0, queue); + final ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config)); try { i.setup(); } catch (IOException e) { @@ -186,7 +208,7 @@ public class EnumerableDrill<E> } } - private static List array(ArrayNode node) { + private static List<Object> array(ArrayNode node) { final List<Object> list = new ArrayList<>(); for (JsonNode jsonNode : node) { list.add(wrapper(jsonNode));
