DRILL-757: Output mutator interface changes
- Output mutator manages schema changes instead of record readers
- Removed usages of deprecated interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/828a5c69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/828a5c69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/828a5c69

Branch: refs/heads/master
Commit: 828a5c69501fd94a64359c6e3abd474f138dc92f
Parents: cb0d46f
Author: Mehant Baid <[email protected]>
Authored: Thu May 15 18:03:28 2014 -0700
Committer: Mehant Baid <[email protected]>
Committed: Sun May 18 02:34:31 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     |  4 -
 .../drill/exec/physical/impl/OutputMutator.java | 14 ++-
 .../drill/exec/physical/impl/ScanBatch.java     | 99 +++++++++++---------
 .../exec/store/easy/json/JSONRecordReader2.java |  3 +-
 .../drill/exec/store/hive/HiveRecordReader.java |  7 +-
 .../exec/store/ischema/RowRecordReader.java     | 17 +---
 .../drill/exec/store/mock/MockRecordReader.java | 18 ++--
 .../exec/store/parquet/ParquetRecordReader.java | 11 ---
 .../drill/exec/store/pojo/PojoRecordReader.java |  2 -
 .../exec/store/text/DrillTextRecordReader.java  |  6 +-
 .../drill/exec/store/TestOutputMutator.java     | 28 +++---
 .../exec/store/ischema/TestOrphanSchema.java    | 19 +++-
 .../exec/store/ischema/TestTableProvider.java   | 22 +++--
 .../store/parquet/ParquetRecordReaderTest.java  | 29 +++---
 14 files changed, 136 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index dbf8123..ae9f833 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -128,7 +128,6 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     this.outputMutator = output;
-    output.removeAllFields();
     vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
 
     try {
@@ -141,8 +140,6 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
           getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
         }
       }
-      output.setNewSchema();
-
       logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum 
'{}', port '{}', znode '{}'.",
           hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
           hbaseConf.get(HBASE_ZOOKEEPER_PORT), 
hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
@@ -227,7 +224,6 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
           v.allocateNew();
         }
         vvMap.put(column, v);
-        outputMutator.setNewSchema();
       }
       return v;
     } catch (SchemaChangeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 38d56ea..1aec625 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -22,12 +22,16 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.List;
+
 public interface OutputMutator {
-  public void removeField(MaterializedField field) throws 
SchemaChangeException;
   public <T extends ValueVector> T addField(MaterializedField field, Class<T> 
clazz) throws SchemaChangeException ;
+  public void allocate(int recordCount);
+  public boolean isNewSchema();
 
-  @Deprecated
-  public void addField(ValueVector vector) throws SchemaChangeException ;
-  public void removeAllFields();
-  public void setNewSchema() throws SchemaChangeException;
+  /* TODO: This interface is added to support information schema tables,
+   * FixedTables, the way they exist currently.
+   * One to many layers to rip out, address it as a separate JIRA.
+   */
+  public void addFields(List<ValueVector> vvList);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a49d1a8..c0810c6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -61,10 +61,11 @@ public class ScanBatch implements RecordBatch {
   private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
   final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
+  final Map<MaterializedField, Class<?>> fieldVectorClassMap = 
Maps.newHashMap();
+  private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
 
   private final VectorContainer container = new VectorContainer();
   private int recordCount;
-  private boolean schemaChanged = true;
   private final FragmentContext context;
   private final OperatorContext oContext;
   private Iterator<RecordReader> readers;
@@ -123,6 +124,7 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public IterOutcome next() {
+    mutator.allocate(MAX_RECORD_CNT);
     while ((recordCount = currentReader.next()) == 0) {
       try {
         if (!readers.hasNext()) {
@@ -133,8 +135,8 @@ public class ScanBatch implements RecordBatch {
         currentReader.cleanup();
         currentReader = readers.next();
         partitionValues = partitionColumns.hasNext() ? partitionColumns.next() 
: null;
-        mutator.removeAllFields();
         currentReader.setup(mutator);
+        mutator.allocate(MAX_RECORD_CNT);
         addPartitionVectors();
       } catch (ExecutionSetupException e) {
         this.context.fail(e);
@@ -144,28 +146,32 @@ public class ScanBatch implements RecordBatch {
     }
 
     populatePartitionVectors();
-    if (schemaChanged) {
-      schemaChanged = false;
+    if (mutator.isNewSchema()) {
+      container.buildSchema(SelectionVectorMode.NONE);
+      schema = container.getSchema();
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
       return IterOutcome.OK;
     }
   }
 
-  private void addPartitionVectors() {
-    partitionVectors = Lists.newArrayList();
-    for (int i : selectedPartitionColumns) {
-      MaterializedField field;
-      ValueVector v;
-      if (partitionValues.length > i) {
-        field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.required(MinorType.VARCHAR));
-        v = new VarCharVector(field, context.getAllocator());
-      } else {
-        field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.optional(MinorType.VARCHAR));
-        v = new NullableVarCharVector(field, context.getAllocator());
+  private void addPartitionVectors() throws ExecutionSetupException{
+    try {
+      partitionVectors = Lists.newArrayList();
+      for (int i : selectedPartitionColumns) {
+        MaterializedField field;
+        ValueVector v;
+        if (partitionValues.length > i) {
+          field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.required(MinorType.VARCHAR));
+          v = mutator.addField(field, VarCharVector.class);
+        } else {
+          field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.optional(MinorType.VARCHAR));
+          v = mutator.addField(field, NullableVarCharVector.class);
+        }
+        partitionVectors.add(v);
       }
-      mutator.addField(v);
-      partitionVectors.add(v);
+    } catch(SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
     }
   }
 
@@ -212,43 +218,52 @@ public class ScanBatch implements RecordBatch {
 
   private class Mutator implements OutputMutator {
 
-    public void removeField(MaterializedField field) throws 
SchemaChangeException {
-      ValueVector vector = fieldVectorMap.remove(field);
-      if (vector == null) throw new SchemaChangeException("Failure attempting 
to remove an unknown field.");
-      container.remove(vector);
-      vector.close();
-    }
+    boolean schemaChange = true;
 
-    public void addField(ValueVector vector) {
-      container.add(vector);
-      fieldVectorMap.put(vector.getField(), vector);
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
+      // Check if the field exists
+      ValueVector v = fieldVectorMap.get(field);
+
+      if (v == null || v.getClass() != clazz) {
+        // Field does not exist add it to the map and the output container
+        v = TypeHelper.getNewVector(field, oContext.getAllocator());
+        if(!clazz.isAssignableFrom(v.getClass())) throw new 
SchemaChangeException(String.format("The class that was provided %s does not 
correspond to the expected vector type of %s.", clazz.getSimpleName(), 
v.getClass().getSimpleName()));
+        container.add(v);
+        fieldVectorMap.put(field, v);
+
+        // Adding new vectors to the container mark that the schema has changed
+        schemaChange = true;
+      }
+
+      return (T) v;
     }
 
     @Override
-    public void removeAllFields() {
-      for(VectorWrapper<?> vw : container){
-        vw.clear();
+    public void addFields(List<ValueVector> vvList) {
+      for (ValueVector v : vvList) {
+        fieldVectorMap.put(v.getField(), v);
+        container.add(v);
       }
-      container.clear();
-      fieldVectorMap.clear();
+      schemaChange = true;
     }
 
     @Override
-    public void setNewSchema() throws SchemaChangeException {
-      container.buildSchema(SelectionVectorMode.NONE);
-      schema = container.getSchema();
-      ScanBatch.this.schemaChanged = true;
+    public void allocate(int recordCount) {
+      for (ValueVector v : fieldVectorMap.values()) {
+        AllocationHelper.allocate(v, recordCount, 50, 10);
+      }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
-      ValueVector v = TypeHelper.getNewVector(field, oContext.getAllocator());
-      if(!clazz.isAssignableFrom(v.getClass())) throw new 
SchemaChangeException(String.format("The class that was provided %s does not 
correspond to the expected vector type of %s.", clazz.getSimpleName(), 
v.getClass().getSimpleName()));
-      addField(v);
-      return (T) v;
+    public boolean isNewSchema() {
+      if (schemaChange == true) {
+        schemaChange = false;
+        return true;
+      }
+      return false;
     }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index bb52a20..37624d2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -95,10 +95,9 @@ public class JSONRecordReader2 implements RecordReader{
 
 
       writer.setValueCount(i);
-      mutator.setNewSchema();
       return i;
 
-    }catch(IOException | SchemaChangeException e){
+    }catch(IOException e){
       throw new DrillRuntimeException("Failure while reading JSON file.", e);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 9e01268..4361262 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -218,7 +218,6 @@ public class HiveRecordReader implements RecordReader {
 
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
-    output.removeAllFields();
     try {
       for (int i = 0; i < columnNames.size(); i++) {
         PrimitiveCategory pCat = primitiveCategories.get(i);
@@ -230,11 +229,9 @@ public class HiveRecordReader implements RecordReader {
       for (int i = 0; i < selectedPartitionNames.size(); i++) {
         String type = selectedPartitionTypes.get(i);
         MaterializedField field = 
MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), 
Types.getMajorTypeFromName(type));
-        ValueVector vv = TypeHelper.getNewVector(field, 
context.getAllocator());
-        pVectors.add(vv);
-        output.addField(vv);
+        Class vvClass = 
TypeHelper.getValueVectorClass(field.getType().getMinorType(), 
field.getDataMode());
+        pVectors.add(output.addField(field, vvClass));
       }
-      output.setNewSchema();
     } catch(SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
index ac601d4..c578b5c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
@@ -29,8 +29,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.ValueVector;
-
-
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 
 /**
@@ -80,15 +79,8 @@ public class RowRecordReader implements RecordReader {
     
     // Inform drill of the output columns. They were set up when the vector 
handler was created.
     //  Note we are currently working with fixed tables.
-    try {
-      for (ValueVector v: batch.getValueVectors()) {
-        output.addField(v);;
-      }
-      output.setNewSchema();
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException("Failure while setting up fields", e);
-    }
-    
+    output.addFields(batch.getValueVectors());
+
     // Estimate the number of records we can hold in a RecordBatch
     maxRowCount = batch.getEstimatedRowCount(bufSize);
   }
@@ -101,9 +93,6 @@ public class RowRecordReader implements RecordReader {
   @Override
   public int next() {
     
-    // Make note are are starting a new batch of records
-    batch.beginBatch(maxRowCount);
-    
     // Repeat until out of data or vectors are full
     int actualCount;
     for (actualCount = 0; actualCount < maxRowCount && provider.hasNext(); 
actualCount++) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 5c07dc5..c7fc939 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mock;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
@@ -34,6 +35,8 @@ import 
org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.List;
+
 public class MockRecordReader implements RecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
 
@@ -76,9 +79,10 @@ public class MockRecordReader implements RecordReader {
 
       for (int i = 0; i < config.getTypes().length; i++) {
         MajorType type = config.getTypes()[i].getMajorType();
-        valueVectors[i] = 
output.addField(getVector(config.getTypes()[i].getName(), type, 
batchRecordCount), (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        MaterializedField field = getVector(config.getTypes()[i].getName(), 
type, batchRecordCount);
+        Class vvClass = 
TypeHelper.getValueVectorClass(field.getType().getMinorType(), 
field.getDataMode());
+        valueVectors[i] = output.addField(field, vvClass);
       }
-      output.setNewSchema();
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException("Failure while setting up fields", e);
     }
@@ -93,7 +97,6 @@ public class MockRecordReader implements RecordReader {
 
     recordsRead += recordSetSize;
     for(ValueVector v : valueVectors){
-      AllocationHelper.allocate(v, recordSetSize, 50, 10);
 
 //      logger.debug(String.format("MockRecordReader:  Generating %d records 
of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
       ValueVector.Mutator m = v.getMutator();
@@ -105,14 +108,5 @@ public class MockRecordReader implements RecordReader {
 
   @Override
   public void cleanup() {
-    for (int i = 0; i < valueVectors.length; i++) {
-      try {
-        output.removeField(valueVectors[i].getField());
-      } catch (SchemaChangeException e) {
-        logger.warn("Failure while trying to remove field.", e);
-      }
-      valueVectors[i].close();
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4ca13a5..9cdd205 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -274,14 +274,6 @@ public class ParquetRecordReader implements RecordReader {
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }
-
-//    output.removeAllFields();
-    try {
-      output.setNewSchema();
-    }catch(SchemaChangeException e) {
-      throw new ExecutionSetupException("Error setting up output mutator.", e);
-    }
-
   }
 
   private SchemaPath toFieldName(String[] paths) {
@@ -298,15 +290,12 @@ public class ParquetRecordReader implements RecordReader {
 
   private void resetBatch() {
     for (ColumnReader column : columnStatuses) {
-      AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5);
       column.valuesReadInCurrentPass = 0;
     }
     for (VarLengthColumn r : varLengthReader.columns){
-      AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
     for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
-      AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index fc5a9b4..1ebd1f5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -89,8 +89,6 @@ public class PojoRecordReader<T> implements RecordReader{
         }
         writers[i].init(output);
       }
-
-      output.setNewSchema();
     }catch(SchemaChangeException e){
       throw new ExecutionSetupException("Failure while setting up schema for 
PojoRecordReader.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index ef05b4c..5c3d381 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
@@ -95,12 +96,9 @@ public class DrillTextRecordReader implements RecordReader {
 
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
-    output.removeAllFields();
     MaterializedField field = MaterializedField.create(ref, 
Types.repeated(TypeProtos.MinorType.VARCHAR));
-    vector = new RepeatedVarCharVector(field, context.getAllocator());
     try {
-      output.addField(vector);
-      output.setNewSchema();
+      vector = output.addField(field, RepeatedVarCharVector.class);
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
index 37369da..0161632 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -56,18 +57,8 @@ public class TestOutputMutator implements OutputMutator, 
Iterable<VectorWrapper<
     fieldVectorMap.put(vector.getField(), vector);
   }
 
-  @Override
-  public void removeAllFields() {
-    for (VectorWrapper<?> vw : container) {
-      vw.clear();
-    }
-    container.clear();
-    fieldVectorMap.clear();
-  }
-
-  @Override
-  public void setNewSchema() throws SchemaChangeException {
-    container.buildSchema(SelectionVectorMode.NONE);
+  public void addFields(List<ValueVector> v) {
+    return;
   }
 
   public Iterator<VectorWrapper<?>> iterator() {
@@ -75,7 +66,17 @@ public class TestOutputMutator implements OutputMutator, 
Iterable<VectorWrapper<
   }
 
   public void clear(){
-    removeAllFields();
+
+  }
+
+  @Override
+  public boolean isNewSchema() {
+    return false;
+  }
+
+  @Override
+  public void allocate(int recordCount) {
+    return;
   }
 
   @Override
@@ -85,5 +86,4 @@ public class TestOutputMutator implements OutputMutator, 
Iterable<VectorWrapper<
     addField(v);
     return (T) v;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
index 3b8b57b..8eadb56 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
@@ -27,6 +27,7 @@ import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -138,18 +139,28 @@ public class TestOrphanSchema extends ExecTest {
       vectors.add(vector);
     }
 
+    public void addFields(List<ValueVector> v) {
+      return;
+    }
+
     public Object get(int column, int row) {
       return vectors.get(column).getAccessor().getObject(row);
     }
 
-    public void removeField(MaterializedField field) {}
-    public void removeAllFields() {}
-    public void setNewSchema() {}
-
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
       return null;
     }
+
+    @Override
+    public void allocate(int recordCount) {
+      return;
+    }
+
+    @Override
+    public boolean isNewSchema() {
+      return false;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
index 8da1ea4..217b792 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
@@ -143,22 +143,28 @@ public class TestTableProvider extends ExecTest {
   static class TestOutput implements OutputMutator {
     List<ValueVector> vectors = new ArrayList<ValueVector>();
 
-    public void addField(ValueVector vector) throws SchemaChangeException {
-      vectors.add(vector);
-    }
-
     public Object get(int column, int row) {
       return vectors.get(column).getAccessor().getObject(row);
     }
 
-    public void removeField(MaterializedField field) {}
-    public void removeAllFields() {}
-    public void setNewSchema() {}
-
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
       return null;
     }
+
+    @Override
+    public void addFields(List<ValueVector> vv) {
+      return;
+    }
+
+    @Override
+    public void allocate(int recordCount) {
+      
+    }
+    @Override
+    public boolean isNewSchema() {
+      return false;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 5d2c859..e594441 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -196,36 +196,33 @@ public class ParquetRecordReaderTest extends 
BaseTestQuery{
     List<MaterializedField> removedFields = Lists.newArrayList();
     List<ValueVector> addFields = Lists.newArrayList();
 
-    @Override
-    public void removeField(MaterializedField field) throws 
SchemaChangeException {
-      removedFields.add(field);
+
+    List<MaterializedField> getRemovedFields() {
+      return removedFields;
     }
 
-    @Override
-    public void addField(ValueVector vector) throws SchemaChangeException {
-      addFields.add(vector);
+    List<ValueVector> getAddFields() {
+      return addFields;
     }
 
     @Override
-    public void removeAllFields() {
-      addFields.clear();
+    public void addFields(List<ValueVector> vv) {
+      return;
     }
 
     @Override
-    public void setNewSchema() throws SchemaChangeException {
+    public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
+      return null;
     }
 
-    List<MaterializedField> getRemovedFields() {
-      return removedFields;
-    }
+    @Override
+    public void allocate(int recordCount) {
 
-    List<ValueVector> getAddFields() {
-      return addFields;
     }
 
     @Override
-    public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
-      return null;
+    public boolean isNewSchema() {
+      return false;
     }
   }
 

Reply via email to