DRILL-825: MaterializedField is mutable and is not suitable as a KEY in a MAP

+ Minor optimization/cleanup in HBaseRecordReader


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cd7aeebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cd7aeebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cd7aeebe

Branch: refs/heads/master
Commit: cd7aeebe65ccb4e8829282aa6e6cae6ed867796b
Parents: 84390e8
Author: Aditya Kishore <[email protected]>
Authored: Thu May 22 18:37:59 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu May 22 19:22:21 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/DrillHBaseConstants.java   |   9 ++
 .../exec/store/hbase/HBaseRecordReader.java     |  17 ++-
 .../drill/exec/physical/impl/ScanBatch.java     |  36 +++---
 .../drill/exec/record/MaterializedField.java    | 110 +++++++++++++------
 4 files changed, 113 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index a86797b..5ba9979 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -18,6 +18,9 @@
 package org.apache.drill.exec.store.hbase;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 
 public interface DrillHBaseConstants {
   static final String ROW_KEY = "row_key";
@@ -26,4 +29,10 @@ public interface DrillHBaseConstants {
 
   static final String HBASE_ZOOKEEPER_PORT = 
"hbase.zookeeper.property.clientPort";
 
+  static final MajorType ROW_KEY_TYPE = Types.required(MinorType.VARBINARY);
+
+  static final MajorType COLUMN_FAMILY_TYPE = Types.required(MinorType.MAP);
+
+  static final MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 439f97f..caee8ed 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -58,13 +58,12 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
-
+  
   private LinkedHashSet<SchemaPath> columns;
   private OutputMutator outputMutator;
 
   private Map<String, MapVector> familyVectorMap;
   private VarBinaryVector rowKeyVector;
-  private SchemaPath rowKeySchemaPath;
 
   private HTable hTable;
   private ResultScanner resultScanner;
@@ -86,8 +85,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       while(columnIterator.hasNext()) {
         SchemaPath column = columnIterator.next();
         if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) {
-          rowKeySchemaPath = ROW_KEY_PATH;
-          this.columns.add(rowKeySchemaPath);
+          this.columns.add(ROW_KEY_PATH);
           continue;
         }
         rowKeyOnly = false;
@@ -104,8 +102,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       }
     } else {
       rowKeyOnly = false;
-      rowKeySchemaPath = ROW_KEY_PATH;
-      this.columns.add(rowKeySchemaPath);
+      this.columns.add(ROW_KEY_PATH);
     }
 
     hbaseScan.setFilter(subScanSpec.getScanFilter());
@@ -130,8 +127,8 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     try {
       // Add Vectors to output in the order specified when creating reader
       for (SchemaPath column : columns) {
-        if (column.equals(rowKeySchemaPath)) {
-          MaterializedField field = MaterializedField.create(column, 
Types.required(TypeProtos.MinorType.VARBINARY));
+        if (column.equals(ROW_KEY_PATH)) {
+          MaterializedField field = MaterializedField.create(column, 
ROW_KEY_TYPE);
           rowKeyVector = outputMutator.addField(field, VarBinaryVector.class);
         } else {
           getOrCreateFamilyVector(column.getRootSegment().getPath(), false);
@@ -216,7 +213,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       MapVector v = familyVectorMap.get(familyName);
       if(v == null) {
         SchemaPath column = SchemaPath.getSimplePath(familyName);
-        MaterializedField field = MaterializedField.create(column, 
Types.required(TypeProtos.MinorType.MAP));
+        MaterializedField field = MaterializedField.create(column, 
COLUMN_FAMILY_TYPE);
         v = outputMutator.addField(field, MapVector.class);
         if (allocateOnCreate) {
           v.allocateNew();
@@ -232,7 +229,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
 
   private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String 
qualifier) {
     int oldSize = mv.size();
-    NullableVarBinaryVector v = mv.addOrGet(qualifier, 
Types.optional(TypeProtos.MinorType.VARBINARY), NullableVarBinaryVector.class);
+    NullableVarBinaryVector v = mv.addOrGet(qualifier, COLUMN_TYPE, 
NullableVarBinaryVector.class);
     if (oldSize != mv.size()) {
       v.allocateNew();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index c0810c6..7febb10 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -22,48 +22,49 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Lists;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.util.BatchPrinter;
-import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 /**
  * Record batch used for a particular scan. Operators against one or more
  */
 public class ScanBatch implements RecordBatch {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
 
   private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
-  final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
-  final Map<MaterializedField, Class<?>> fieldVectorClassMap = 
Maps.newHashMap();
   private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
 
+  private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = 
Maps.newHashMap();
+
   private final VectorContainer container = new VectorContainer();
   private int recordCount;
   private final FragmentContext context;
@@ -74,8 +75,8 @@ public class ScanBatch implements RecordBatch {
   private final Mutator mutator = new Mutator();
   private Iterator<String[]> partitionColumns;
   private String[] partitionValues;
-  List<ValueVector> partitionVectors;
-  List<Integer> selectedPartitionColumns;
+  private List<ValueVector> partitionVectors;
+  private List<Integer> selectedPartitionColumns;
   private String partitionColumnDesignator;
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, 
Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> 
selectedPartitionColumns) throws ExecutionSetupException {
@@ -95,7 +96,7 @@ public class ScanBatch implements RecordBatch {
   }
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, 
Iterator<RecordReader> readers) throws ExecutionSetupException {
-    this(subScanConfig, context, readers, Collections.EMPTY_LIST, 
Collections.EMPTY_LIST);
+    this(subScanConfig, context, readers, Collections.<String[]> emptyList(), 
Collections.<Integer> emptyList());
   }
 
   @Override
@@ -224,14 +225,14 @@ public class ScanBatch implements RecordBatch {
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, 
Class<T> clazz) throws SchemaChangeException {
       // Check if the field exists
-      ValueVector v = fieldVectorMap.get(field);
+      ValueVector v = fieldVectorMap.get(field.key());
 
       if (v == null || v.getClass() != clazz) {
         // Field does not exist add it to the map and the output container
         v = TypeHelper.getNewVector(field, oContext.getAllocator());
         if(!clazz.isAssignableFrom(v.getClass())) throw new 
SchemaChangeException(String.format("The class that was provided %s does not 
correspond to the expected vector type of %s.", clazz.getSimpleName(), 
v.getClass().getSimpleName()));
         container.add(v);
-        fieldVectorMap.put(field, v);
+        fieldVectorMap.put(field.key(), v);
 
         // Adding new vectors to the container mark that the schema has changed
         schemaChange = true;
@@ -243,7 +244,7 @@ public class ScanBatch implements RecordBatch {
     @Override
     public void addFields(List<ValueVector> vvList) {
       for (ValueVector v : vvList) {
-        fieldVectorMap.put(v.getField(), v);
+        fieldVectorMap.put(v.getField().key(), v);
         container.add(v);
       }
       schemaChange = true;
@@ -278,6 +279,7 @@ public class ScanBatch implements RecordBatch {
 
   public void cleanup(){
     container.clear();
+    fieldVectorMap.clear();
     oContext.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 3d749d6..48073c0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.PathSegment;
@@ -30,15 +31,13 @@ import 
org.apache.drill.exec.proto.UserBitShared.SerializedField;
 
 import com.google.hive12.common.collect.Lists;
 
-public class MaterializedField{
-  private SchemaPath path;
-  private MajorType type;
+public class MaterializedField {
+  private Key key;
   private List<MaterializedField> children = Lists.newArrayList();
 
   private MaterializedField(SchemaPath path, MajorType type) {
     super();
-    this.path = path;
-    this.type = type;
+    key = new Key(path, type);
   }
 
   public static MaterializedField create(SerializedField serField){
@@ -47,8 +46,8 @@ public class MaterializedField{
 
   public SerializedField.Builder getAsBuilder(){
     return SerializedField.newBuilder() //
-        .setMajorType(type) //
-        .setNamePart(path.getAsNamePart());
+        .setMajorType(key.type) //
+        .setNamePart(key.path.getAsNamePart());
   }
 
   public void addChild(MaterializedField field){
@@ -56,11 +55,11 @@ public class MaterializedField{
   }
 
   public MaterializedField clone(FieldReference ref){
-    return create(ref, type);
+    return create(ref, key.type);
   }
 
   public String getLastName(){
-    PathSegment seg = path.getRootSegment();
+    PathSegment seg = key.path.getRootSegment();
     while(seg.getChild() != null) seg = seg.getChild();
     return seg.getNameSegment().getPath();
   }
@@ -82,7 +81,7 @@ public class MaterializedField{
   }
 
   public SchemaPath getPath(){
-    return path;
+    return key.path;
   }
 
   /**
@@ -91,7 +90,7 @@ public class MaterializedField{
    */
   @Deprecated
   public SchemaPath getAsSchemaPath(){
-    return path;
+    return getPath();
   }
 
 //  public String getName(){
@@ -116,29 +115,29 @@ public class MaterializedField{
 //  }
 
   public int getWidth() {
-    return type.getWidth();
+    return key.type.getWidth();
   }
 
   public MajorType getType() {
-    return type;
+    return key.type;
   }
 
   public int getScale() {
-      return type.getScale();
+      return key.type.getScale();
   }
   public int getPrecision() {
-      return type.getPrecision();
+      return key.type.getPrecision();
   }
   public boolean isNullable() {
-    return type.getMode() == DataMode.OPTIONAL;
+    return key.type.getMode() == DataMode.OPTIONAL;
   }
 
   public DataMode getDataMode() {
-    return type.getMode();
+    return key.type.getMode();
   }
 
   public MaterializedField getOtherNullableVersion(){
-    MajorType mt = type;
+    MajorType mt = key.type;
     DataMode newDataMode = null;
     switch(mt.getMode()){
     case OPTIONAL:
@@ -150,7 +149,7 @@ public class MaterializedField{
     default:
       throw new UnsupportedOperationException();
     }
-    return new MaterializedField(path, 
mt.toBuilder().setMode(newDataMode).build());
+    return new MaterializedField(key.path, 
mt.toBuilder().setMode(newDataMode).build());
   }
 
   public Class<?> getValueClass() {
@@ -160,7 +159,7 @@ public class MaterializedField{
   public boolean matches(SchemaPath path) {
     if(!path.isSimplePath()) return false;
 
-    return this.path.equals(path);
+    return key.path.equals(path);
   }
 
 
@@ -169,8 +168,7 @@ public class MaterializedField{
     final int prime = 31;
     int result = 1;
     result = prime * result + ((children == null) ? 0 : children.hashCode());
-    result = prime * result + ((path == null) ? 0 : path.hashCode());
-    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    result = prime * result + ((key == null) ? 0 : key.hashCode());
     return result;
   }
 
@@ -188,25 +186,73 @@ public class MaterializedField{
         return false;
     } else if (!children.equals(other.children))
       return false;
-    if (path == null) {
-      if (other.path != null)
+    if (key == null) {
+      if (other.key != null)
         return false;
-    } else if (!path.equals(other.path))
-      return false;
-    if (type == null) {
-      if (other.type != null)
-        return false;
-    } else if (!type.equals(other.type))
+    } else if (!key.equals(other.key))
       return false;
     return true;
   }
 
   @Override
   public String toString() {
-    return "MaterializedField [path=" + path + ", type=" + 
Types.toString(type) + "]";
+    return "MaterializedField [path=" + key.path + ", type=" + 
Types.toString(key.type) + "]";
+  }
+
+  public Key key() {
+    return key;
   }
 
   public String toExpr(){
-    return path.toExpr();
+    return key.path.toExpr();
+  }
+
+  /**
+   * Since the {@code MaterializedField) itself is mutable, in certain cases, 
it is not suitable
+   * as a key of a {@link Map}. This inner class allows the {@link 
MaterializedField} object to be
+   * used for this purpose.
+   */
+  public class Key {
+
+    private SchemaPath path;
+    private MajorType type;
+
+    private Key(SchemaPath path, MajorType type) {
+      this.path = path;
+      this.type = type;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((path == null) ? 0 : path.hashCode());
+      result = prime * result + ((type == null) ? 0 : type.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      Key other = (Key) obj;
+      if (path == null) {
+        if (other.path != null)
+          return false;
+      } else if (!path.equals(other.path))
+        return false;
+      if (type == null) {
+        if (other.type != null)
+          return false;
+      } else if (!type.equals(other.type))
+        return false;
+      return true;
+    }
+
   }
+
 }
\ No newline at end of file

Reply via email to