http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java
new file mode 100644
index 0000000..3f86c98
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSEBase.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ref.ExecRefConstants;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.exceptions.MajorException;
+import org.apache.drill.exec.ref.rops.ROP;
+
+import com.typesafe.config.Config;
+
+public abstract class RSEBase implements ReferenceStorageEngine{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RSEBase.class);
+  
+  @Override
+  public boolean supportsRead() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+    throw new UnsupportedOperationException(String.format("%s does not support 
reads.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws 
IOException {
+    throw new UnsupportedOperationException(String.format("%s does not support 
reads.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public RecordRecorder getWriter(Store store) throws IOException {
+    throw new UnsupportedOperationException(String.format("%s does not support 
writes.", this.getClass().getCanonicalName()));
+  }
+  
+  public static Class<?>[] getSubTypes(Config config){
+    Collection<Class<? extends ReferenceStorageEngine>> engines = 
PathScanner.scanForImplementations(ReferenceStorageEngine.class, 
config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    return engines.toArray(new Class<?>[engines.size()]);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected <T extends ReadEntry> T getReadEntry(Class<T> c, ReadEntry entry){
+    if(!c.isAssignableFrom(entry.getClass())) throw new 
MajorException(String.format("Expected entry type was invalid.  Expected entry 
of type %s but received type of %s.", c.getCanonicalName(), 
entry.getClass().getCanonicalName()));
+    return (T) entry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java
new file mode 100644
index 0000000..4266aac
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RSERegistry.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ref.ExecRefConstants;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+import com.typesafe.config.Config;
+
+public class RSERegistry {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RSERegistry.class);
+  
+  private Map<Object, Constructor<? extends ReferenceStorageEngine>> 
availableEngines = new HashMap<Object, Constructor<? extends 
ReferenceStorageEngine>>();
+  private Map<StorageEngineConfig, ReferenceStorageEngine> activeEngines = new 
HashMap<StorageEngineConfig, ReferenceStorageEngine>();
+  private DrillConfig config;
+  
+  public RSERegistry(DrillConfig config){
+    this.config = config;
+    setup(config);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void setup(DrillConfig config){
+    Collection<Class<? extends ReferenceStorageEngine>> engines = 
PathScanner.scanForImplementations(ReferenceStorageEngine.class, 
config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage engines {}", engines);
+    for(Class<? extends ReferenceStorageEngine> engine: engines){
+      int i =0;
+      for(Constructor<?> c : engine.getConstructors()){
+        Class<?>[] params = c.getParameterTypes();
+        if(params.length != 2 || params[1] == Config.class || 
!StorageEngineConfig.class.isAssignableFrom(params[0])){
+          logger.debug("Skipping ReferenceStorageEngine constructor {} for 
engine class {} since it doesn't implement a [constructor(StorageEngineConfig, 
Config)]", c, engine);
+          continue;
+        }
+        availableEngines.put(params[0], (Constructor<? extends 
ReferenceStorageEngine>) c);
+        i++;
+      }
+      if(i == 0){
+        logger.debug("Skipping registration of ReferenceStorageEngine {} as it 
doesn't have a constructor with the parameters of (StorangeEngineConfig, 
Config)", engine.getCanonicalName());
+      }
+    }
+  }
+  
+  public ReferenceStorageEngine getEngine(StorageEngineConfig engineConfig) 
throws SetupException{
+    ReferenceStorageEngine engine = activeEngines.get(engineConfig);
+    if(engine != null) return engine;
+    Constructor<? extends ReferenceStorageEngine> c = 
availableEngines.get(engineConfig.getClass());
+    if(c == null) throw new SetupException(String.format("Failure finding 
StorageEngine constructor for config %s", engineConfig));
+    try {
+      return c.newInstance(engineConfig, config);
+    } catch (InstantiationException | IllegalAccessException | 
IllegalArgumentException | InvocationTargetException e) {
+      Throwable t = e instanceof InvocationTargetException ? 
((InvocationTargetException)e).getTargetException() : e;
+      if(t instanceof SetupException) throw ((SetupException) t);
+      throw new SetupException(String.format("Failure setting up new storage 
engine configuration for config %s", engineConfig), t);
+    }
+  }
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java
new file mode 100644
index 0000000..b7840bc
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordReader.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public interface RecordReader {
+
+  public abstract RecordIterator getIterator();
+  public abstract void setup();
+  public abstract void cleanup();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java
new file mode 100644
index 0000000..9527b0b
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/RecordRecorder.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome;
+
+public interface RecordRecorder {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordRecorder.class);
+  
+  public void setup() throws IOException;
+  public long recordRecord(RecordPointer pointer) throws IOException;
+  public void finish(RunOutcome.OutcomeType outcome) throws IOException;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java
new file mode 100644
index 0000000..41cba45
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/ReferenceStorageEngine.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.rops.ROP;
+
+
+public interface ReferenceStorageEngine {
+  public boolean supportsRead();
+  public boolean supportsWrite();
+
+  public enum PartitionCapabilities {
+    NONE, HASH, SORTED;
+  }
+
+  public enum MemoryFormat {
+    RECORD, FIELD;
+  }
+
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
+  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws 
IOException;
+  public RecordRecorder getWriter(Store store) throws IOException;
+
+  public interface ReadEntry{}
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
index 971e8ce..6603851 100644
--- 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
@@ -67,6 +67,16 @@ public abstract class BaseDataValue implements DataValue{
   public abstract int hashCode();
 
   @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof DataValue){
+      return this.equals((DataValue) obj);
+    }else{
+      return false;
+    }
+  }
+  
+
+  @Override
   public abstract DataValue copy();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java
new file mode 100644
index 0000000..19efcd4
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValueSet.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import java.util.Arrays;
+
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+
+public class DataValueSet {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DataValueSet.class);
+  
+  private final DataValue[] values;
+  private final BasicEvaluator[] evaluators;
+  
+  public DataValueSet(BasicEvaluator... evaluators){
+    this.values = new DataValue[evaluators.length];
+    this.evaluators = evaluators;
+  }
+  
+  private DataValueSet(DataValue[] values){
+    this.values = new DataValue[values.length];
+    System.arraycopy(values, 0, this.values, 0, values.length);
+    this.evaluators = null;
+  }
+  
+  public void grabValues(){
+    for(int i =0; i < values.length; i++){
+      values[i] = evaluators[i].eval();
+    }
+  }
+
+  public void copyFrom(DataValueSet set){
+    System.arraycopy(set.values, 0, values, 0, set.values.length);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(values);
+    return result;
+  }
+
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (obj == null) return false;
+    if (getClass() != obj.getClass()) return false;
+    DataValueSet other = (DataValueSet) obj;
+    if (!Arrays.equals(values, other.values)) return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "DataValueSet [values=" + Arrays.toString(values) + "]";
+  }
+
+  public DataValueSet cloneValuesOnly(){
+    return new DataValueSet(this.values);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java
index f690c67..88efb92 100644
--- 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/NumericValue.java
@@ -54,7 +54,7 @@ public abstract class NumericValue extends BaseDataValue 
implements ComparableVa
     case INT:
       return Integer.compare(this.getAsInt(), other.getAsInt());
     case LONG:
-      return Long.compare(this.getAsInt(), other.getAsInt());
+      return Long.compare(this.getAsLong(), other.getAsLong());
     default:
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
index 2a5c1de..4df146c 100644
--- 
a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
+++ 
b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
@@ -414,7 +414,7 @@ public final class ScalarValues {
     public String toString() {
       return "DoubleScalar [d=" + d + "]";
     }
-    
+ 
     @Override
     public int hashCode() {
       return getHashCode(d);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf 
b/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..c3b50e8
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/resources/drill-module.conf
@@ -0,0 +1,4 @@
+//  This file tells Drill to consider this module when class path scanning.  
+//  This file can also include any supplementary configuration information.  
+//  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.storage.packages += "org.apache.drill.exec.ref.rse"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java
 
b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java
new file mode 100644
index 0000000..e15c568
--- /dev/null
+++ 
b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/RunSimplePlan.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.ref;
+
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
+import org.apache.drill.exec.ref.rse.RSERegistry;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class RunSimplePlan{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RunSimplePlan.class);
+  
+  
+  @Test
+  public void parseSimplePlan() throws Exception{
+    DrillConfig config = DrillConfig.create();
+    LogicalPlan plan = LogicalPlan.parse(config, 
Files.toString(FileUtils.getResourceAsFile("/simple_plan.json"), 
Charsets.UTF_8));
+    IteratorRegistry ir = new IteratorRegistry();
+    ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new 
BasicEvaluatorFactory(ir), new RSERegistry(config));
+    i.setup();
+    Collection<RunOutcome> outcomes = i.run();
+    assertEquals(outcomes.size(), 1);
+    assertEquals(outcomes.iterator().next().records, 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json 
b/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json
index 936c094..34297b4 100644
--- a/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json
+++ b/sandbox/prototype/exec/ref/src/test/resources/simple_plan.json
@@ -7,13 +7,19 @@
       info:"na"
     }
   },
-  sources:[
+  storage:[
     {
-      type:"json",
-      name:"donuts-json",
-      files:[
-        "src/test/resources/donuts.json"
-      ]
+      type:"console",
+      name:"console"
+    },
+    {
+      type:"fs",
+      name:"fs1",
+      root:"file:///"
+    },
+    {
+      type:"classpath",
+      name:"cp"
     }
   ],
   query:[
@@ -24,33 +30,35 @@
              op: "scan",
              memo: "initial_scan",
              ref: "donuts",
-             source: "donuts-json",
-             selection: {data: "activity"}
+             storageengine: "cp",
+             selection: {
+               path: "/donuts.json",
+               type: "JSON"
+             }
            },
-           {
+        {
              op: "transform",
              transforms: [
-               { ref: "donuts.quantity", expr: "donuts.sales"}
+               { ref: "quantity", expr: "donuts.sales"}
              ]
            },
            {
              op: "filter",
              expr: "donuts.ppu < 1.00"
-           },
+           }, 
            {
-             op: "group",
-             groupings: [
-               { ref: "donuts.ppu", expr: "donuts.ppu" }
-             ]
+             op: "segment",
+             ref: "ppusegment",
+             exprs: ["donuts.ppu"]
            },
            {
-             op: "aggregate",
-             type: "simple",
-             keys: ["donuts.ppu"],
+             op: "collapsingaggregate",
+             within: "ppusegment",
+             carryovers: ["donuts.ppu"], 
              aggregations: [
                { ref: "donuts.typeCount",  expr: "count(1)" },
-               { ref: "donuts.quantity",  expr: "sum(donuts.sales)" },
-               { ref: "donuts.sales",  expr: "sum(donuts.ppu * donuts.sales)" }
+               { ref: "donuts.quantity",  expr: "sum(quantity)" },
+               { ref: "donuts.sales",  expr: "sum(donuts.ppu * quantity)" }
              ]
            },
            {
@@ -58,7 +66,7 @@
              orderings: [
                {order: "desc", expr: "donuts.ppu" }
              ]
-           },
+           }, 
            {
              op: "project",
              projections: [
@@ -68,12 +76,13 @@
            {
           op: "limit",
           first: 0,
-          last: 2
-        },
+          last: 100
+        }, 
            {
-             op: "write",
+             op: "store",
              memo: "output sink",
-             file: "console:///stdout"
+             storageengine: "console",
+             target: {pipe: "STD_OUT"}
            }
       ]
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/pom.xml 
b/sandbox/prototype/sqlparser/pom.xml
index 733fb1b..fabc44d 100644
--- a/sandbox/prototype/sqlparser/pom.xml
+++ b/sandbox/prototype/sqlparser/pom.xml
@@ -37,6 +37,14 @@
       <version>1.0-SNAPSHOT</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>ref</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>    
+    
+    <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
       <version>2.6.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
index eae7fdb..f530ea8 100644
--- 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
+++ 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
@@ -17,15 +17,23 @@
  
******************************************************************************/
 package org.apache.drill.jdbc;
 
+import java.lang.reflect.Type;
+import java.util.Collections;
+
 import net.hydromatic.linq4j.BaseQueryable;
 import net.hydromatic.linq4j.Enumerator;
 import net.hydromatic.linq4j.Linq4j;
 import net.hydromatic.linq4j.expressions.Expression;
 import net.hydromatic.linq4j.expressions.Expressions;
 import net.hydromatic.linq4j.expressions.MethodCallExpression;
-import net.hydromatic.optiq.*;
+import net.hydromatic.optiq.BuiltinMethod;
+import net.hydromatic.optiq.DataContext;
+import net.hydromatic.optiq.MutableSchema;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.TranslatableTable;
 
-import org.apache.drill.common.logical.sources.DataSource;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig;
 import org.apache.drill.optiq.DrillOptiq;
 import org.apache.drill.optiq.DrillScan;
 import org.eigenbase.rel.RelNode;
@@ -34,9 +42,6 @@ import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.reltype.RelDataTypeFactory;
 import org.eigenbase.sql.type.SqlTypeName;
 
-import java.lang.reflect.Type;
-import java.util.Collections;
-
 /** Optiq Table used by Drill. */
 public class DrillTable extends BaseQueryable<Object>
     implements TranslatableTable<Object>
@@ -44,7 +49,8 @@ public class DrillTable extends BaseQueryable<Object>
   private final Schema schema;
   private final String name;
   private final RelDataType rowType;
-  public final DataSource dataSource;
+  public final StorageEngineConfig storageEngineConfig;
+  public final Object selection;
 
   /** Creates a DrillTable. */
   public DrillTable(Schema schema,
@@ -52,16 +58,19 @@ public class DrillTable extends BaseQueryable<Object>
       Expression expression,
       RelDataType rowType,
       String name,
-      DataSource dataSource) {
+      StorageEngineConfig storageEngineConfig,
+      Object selection
+      ) {
     super(schema.getQueryProvider(), elementType, expression);
     this.schema = schema;
     this.name = name;
     this.rowType = rowType;
-    this.dataSource = dataSource;
+    this.storageEngineConfig = storageEngineConfig;
+    this.selection = selection;
   }
 
   static void addTable(RelDataTypeFactory typeFactory, MutableSchema schema,
-      String name, DataSource dataSource) {
+      String name, StorageEngineConfig storageEngineConfig, Object selection) {
     final MethodCallExpression call = Expressions.call(schema.getExpression(),
         BuiltinMethod.DATA_CONTEXT_GET_TABLE.method,
         Expressions.constant(name),
@@ -72,7 +81,7 @@ public class DrillTable extends BaseQueryable<Object>
                 typeFactory.createSqlType(SqlTypeName.ANY)),
             Collections.singletonList("_extra"));
     final DrillTable table =
-        new DrillTable(schema, Object.class, call, rowType, name, dataSource);
+        new DrillTable(schema, Object.class, call, rowType, name, 
storageEngineConfig, selection);
     schema.addTable(name, table);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java
index 73cdcd8..0a0594c 100644
--- 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java
+++ 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/Driver.java
@@ -17,14 +17,21 @@
  
******************************************************************************/
 package org.apache.drill.jdbc;
 
-import net.hydromatic.optiq.*;
-import net.hydromatic.optiq.impl.java.MapSchema;
-import net.hydromatic.optiq.jdbc.*;
-import org.apache.drill.common.logical.sources.JSONDataSource;
-
 import java.sql.SQLException;
 import java.util.Collections;
 
+import net.hydromatic.optiq.MutableSchema;
+import net.hydromatic.optiq.impl.java.MapSchema;
+import net.hydromatic.optiq.jdbc.DriverVersion;
+import net.hydromatic.optiq.jdbc.Handler;
+import net.hydromatic.optiq.jdbc.HandlerImpl;
+import net.hydromatic.optiq.jdbc.OptiqConnection;
+import net.hydromatic.optiq.jdbc.UnregisteredDriver;
+
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig;
+import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathRSEConfig;
+
 /**
  * JDBC driver for Apache Drill.
  */
@@ -66,11 +73,12 @@ public class Driver extends UnregisteredDriver {
           MapSchema.create(connection, rootSchema, schemaName);
 
       connection.setSchema(schemaName);
+      final ClasspathRSEConfig rseConfig = new 
ClasspathRSEConfig("donuts-json");
+      final ClasspathInputConfig inputConfig = new ClasspathInputConfig();
+      inputConfig.path = "/donuts.json";
+      inputConfig.type = ConverterType.JSON; 
+      
 
-      final JSONDataSource dataSource = new JSONDataSource();
-      dataSource.name = "donuts-json";
-      dataSource.files =
-          Collections.singletonList("src/test/resources/donuts.json");
 
       // "tables" is a temporary parameter. We should replace with
       // "schemaUri", which is the URI of a schema.json file, or the name of a
@@ -82,7 +90,7 @@ public class Driver extends UnregisteredDriver {
       final String[] tables2 = tables.split(",");
       for (String table : tables2) {
         DrillTable.addTable(connection.getTypeFactory(), schema, table,
-            dataSource);
+            rseConfig, inputConfig);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
index f4ca721..445b118 100644
--- 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
+++ 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
@@ -17,7 +17,10 @@
  
******************************************************************************/
 package org.apache.drill.optiq;
 
+import org.apache.drill.exec.ref.rse.QueueRSE.QueueOutputInfo;
+
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -51,15 +54,24 @@ public class DrillImplementor {
     // TODO: populate sources based on the sources of scans that occur in
     // the query
     final ArrayNode sourcesNode = mapper.createArrayNode();
-    rootNode.put("sources", sourcesNode);
-    final ObjectNode sourceNode = mapper.createObjectNode();
-    sourcesNode.add(sourceNode);
-    sourceNode.put("type", "json");
-    sourceNode.put("name", "donuts-json");
-    final ArrayNode filesNode = mapper.createArrayNode();
-    sourceNode.put("files", filesNode);
-    filesNode.add("src/test/resources/donuts.json");
+    rootNode.put("storage", sourcesNode);
+    
+    // input file source
+    {
+      final ObjectNode sourceNode = mapper.createObjectNode();
+      sourceNode.put("name", "donuts-json");
+      sourceNode.put("type", "classpath");
+      sourcesNode.add(sourceNode);
+    }
+    {
+      final ObjectNode sourceNode = mapper.createObjectNode();
+      sourceNode.put("name", "queue");
+      sourceNode.put("type", "queue");
+      sourcesNode.add(sourceNode);
+    }
+    
 
+    
     final ArrayNode queryNode = mapper.createArrayNode();
     rootNode.put("query", queryNode);
 
@@ -76,9 +88,12 @@ public class DrillImplementor {
 
     // Add a last node, to write to the output queue.
     final ObjectNode writeOp = mapper.createObjectNode();
-    writeOp.put("op", "write");
+    writeOp.put("op", "store");
+    writeOp.put("storageengine", "queue");
     writeOp.put("memo", "output sink");
-    writeOp.put("file", "socket:0");
+    QueueOutputInfo output = new QueueOutputInfo();
+    output.number = 0;
+    writeOp.put("target", mapper.convertValue(output, JsonNode.class));
     add(writeOp);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java
 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java
index 87a32ad..c18baaf 100644
--- 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java
+++ 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillScan.java
@@ -1,14 +1,15 @@
 package org.apache.drill.optiq;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.drill.jdbc.DrillTable;
-
 import org.eigenbase.rel.TableAccessRelBase;
 import org.eigenbase.relopt.RelOptCluster;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelOptTable;
 import org.eigenbase.relopt.RelTraitSet;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * Scan of a Drill table.
  */
@@ -37,7 +38,8 @@ public class DrillScan extends TableAccessRelBase implements 
DrillRel {
     node.put("op", "scan");
     node.put("memo", "initial_scan");
     node.put("ref", "donuts");
-    node.put("source", drillTable.dataSource.getName());
+    node.put("storageengine", drillTable.storageEngineConfig.getName());
+    node.put("selection", 
implementor.mapper.convertValue(drillTable.selection, JsonNode.class));
     implementor.add(node);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e4a1384/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
index 15fe06d..016a386 100644
--- 
a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
+++ 
b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
@@ -17,24 +17,42 @@
  
******************************************************************************/
 package org.apache.drill.optiq;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import net.hydromatic.linq4j.AbstractEnumerable;
 import net.hydromatic.linq4j.Enumerable;
 import net.hydromatic.linq4j.Enumerator;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.exec.ref.IteratorRegistry;
 import org.apache.drill.exec.ref.ReferenceInterpreter;
 import org.apache.drill.exec.ref.RunOutcome;
 import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
+import org.apache.drill.exec.ref.rse.RSERegistry;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Runtime helper that executes a Drill query and converts it into an
@@ -44,8 +62,9 @@ public class EnumerableDrill<E>
     extends AbstractEnumerable<E>
     implements Enumerable<E> {
   private final LogicalPlan plan;
-  final BlockingQueue queue = new ArrayBlockingQueue(100);
-
+  final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
+  final DrillConfig config;
+  
   private static final ObjectMapper mapper = createMapper();
 
   /** Creates a DrillEnumerable.
@@ -53,23 +72,26 @@ public class EnumerableDrill<E>
    * @param plan Logical plan
    * @param clazz Type of elements returned from enumerable
    */
-  public EnumerableDrill(LogicalPlan plan, Class<E> clazz) {
+  public EnumerableDrill(DrillConfig config, LogicalPlan plan, Class<E> clazz) 
{
     this.plan = plan;
+    this.config = config;
+    config.setSinkQueues(0, queue);
   }
 
   /** Creates a DrillEnumerable from a plan represented as a string. */
   public static <E extends JsonNode> EnumerableDrill<E> of(String plan,
       Class<E> clazz) {
-    return new EnumerableDrill<E>(LogicalPlan.parse(plan), clazz);
+    DrillConfig config = DrillConfig.create();
+    return new EnumerableDrill<E>(config, LogicalPlan.parse(config, plan), 
clazz);
   }
 
   /** Runs the plan as a background task. */
   Future<Collection<RunOutcome>> runPlan(
       CompletionService<Collection<RunOutcome>> service) {
     IteratorRegistry ir = new IteratorRegistry();
-
-    final ReferenceInterpreter i = new ReferenceInterpreter(plan, ir,
-        new BasicEvaluatorFactory(ir), Collections.singletonList((Queue) 
queue));
+    DrillConfig config = DrillConfig.create();
+    config.setSinkQueues(0, queue);
+    final ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new 
BasicEvaluatorFactory(ir), new RSERegistry(config));
     try {
       i.setup();
     } catch (IOException e) {
@@ -186,7 +208,7 @@ public class EnumerableDrill<E>
     }
   }
 
-  private static List array(ArrayNode node) {
+  private static List<Object> array(ArrayNode node) {
     final List<Object> list = new ArrayList<>();
     for (JsonNode jsonNode : node) {
       list.add(wrapper(jsonNode));

Reply via email to