Repository: drill
Updated Branches:
  refs/heads/master fe8471316 -> fe79a633a


DRILL-5830: Resolve regressions to MapR DB from DRILL-5546

- Back out HBase changes
- Code cleanup
- Test utilities
- Fix for DRILL-5829

closes #968


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/42f7af22
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/42f7af22
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/42f7af22

Branch: refs/heads/master
Commit: 42f7af22fc5d713aac07e057fd374ccd674e40df
Parents: fe84713
Author: Paul Rogers <prog...@maprtech.com>
Authored: Thu Sep 28 09:49:38 2017 -0700
Committer: Paul Rogers <prog...@maprtech.com>
Committed: Wed Oct 11 13:14:56 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  95 ++---
 .../exec/store/hbase/HBaseRecordReader.java     |   6 +-
 .../drill/exec/store/hbase/HBaseScanSpec.java   |   1 -
 .../exec/store/hbase/HBaseSchemaFactory.java    |   4 +-
 .../exec/store/hbase/HBaseStoragePlugin.java    |   4 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |   9 +-
 .../org/apache/drill/hbase/BaseHBaseTest.java   |   2 +-
 .../exec/client/PrintingResultsListener.java    |  55 +--
 .../exec/physical/config/UnionExchange.java     |   7 +-
 .../UnorderedReceiverBatch.java                 |   5 +-
 .../UnorderedReceiverCreator.java               |   6 +-
 .../planner/logical/DrillPushProjIntoScan.java  |   1 -
 .../exec/planner/physical/ProjectPrel.java      |   2 +-
 .../apache/drill/exec/record/BatchSchema.java   |  15 +
 .../drill/exec/record/RecordBatchLoader.java    |  63 +++-
 .../drill/exec/record/VectorContainer.java      |   2 +-
 .../org/apache/drill/exec/util/VectorUtil.java  |  27 +-
 .../work/foreman/rm/DistributedQueryQueue.java  |   3 +
 .../drill/exec/record/vector/TestLoad.java      | 358 +++++++++++++++++--
 .../org/apache/drill/test/QueryBuilder.java     |   4 +
 .../apache/drill/test/QueryRowSetIterator.java  | 118 ++++++
 .../src/main/java/io/netty/buffer/DrillBuf.java |   6 +-
 .../drill/exec/record/MaterializedField.java    |  53 +++
 23 files changed, 691 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 1ee1da8..2b8cf18 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,17 +17,22 @@
  */
 package org.apache.drill.exec.store.hbase;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -50,24 +55,18 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements 
DrillHBaseConstants {
@@ -144,8 +143,8 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
     HBaseGroupScan newScan = new HBaseGroupScan(this);
-    newScan.columns = columns == null ? ALL_COLUMNS : columns;;
-    newScan.verifyColumnsAndConvertStar();
+    newScan.columns = columns == null ? ALL_COLUMNS : columns;
+    newScan.verifyColumns();
     return newScan;
   }
 
@@ -177,37 +176,19 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     } catch (IOException e) {
       throw new DrillRuntimeException("Error getting region info for table: " 
+ hbaseScanSpec.getTableName(), e);
     }
-    verifyColumnsAndConvertStar();
+    verifyColumns();
   }
 
-  private void verifyColumnsAndConvertStar() {
-    boolean hasStarCol = false;
-    LinkedHashSet<SchemaPath> requestedColumns = new LinkedHashSet<>();
-
+  private void verifyColumns() {
+    if (Utilities.isStarQuery(columns)) {
+      return;
+    }
     for (SchemaPath column : columns) {
-      // convert * into [row_key, cf1, cf2, ..., cf_n].
-      if (column.equals(Utilities.STAR_COLUMN)) {
-        hasStarCol = true;
-        Set<byte[]> families = hTableDesc.getFamiliesKeys();
-        requestedColumns.add(ROW_KEY_PATH);
-        for (byte[] family : families) {
-          SchemaPath colFamily = 
SchemaPath.getSimplePath(Bytes.toString(family));
-          requestedColumns.add(colFamily);
-        }
-      } else {
-        if (!(column.equals(ROW_KEY_PATH) ||
-            
hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
-          DrillRuntimeException.format("The column family '%s' does not exist 
in HBase table: %s .",
-              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
-        }
-        requestedColumns.add(column);
+      if (!(column.equals(ROW_KEY_PATH) || 
hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+        DrillRuntimeException.format("The column family '%s' does not exist in 
HBase table: %s .",
+            column.getRootSegment().getPath(), hTableDesc.getNameAsString());
       }
     }
-
-    // since star column has been converted, reset this.cloumns.
-    if (hasStarCol) {
-      this.columns = new ArrayList<>(requestedColumns);
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index d6c02b5..cae7ce4 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -126,12 +126,10 @@ public class HBaseRecordReader extends 
AbstractRecordReader implements DrillHBas
             HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), 
HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()));
       }
     } else {
-      throw new IllegalArgumentException("HBaseRecordReader does not allow 
column *. Column * should have been converted to list of <row_key, column 
family1, column family2, ..., column family_n");
-//      rowKeyOnly = false;
-//      transformed.add(ROW_KEY_PATH);
+      rowKeyOnly = false;
+      transformed.add(ROW_KEY_PATH);
     }
 
-
     return transformed;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
index f9a585e..797ec7f 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
@@ -93,5 +93,4 @@ public class HBaseScanSpec {
         + ", filter=" + (filter == null ? null : filter.toString())
         + "]";
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 56dfc10..548b679 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -94,7 +94,5 @@ public class HBaseSchemaFactory implements SchemaFactory {
     public String getTypeName() {
       return HBaseStoragePluginConfig.NAME;
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 81899cf..62f351c 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -154,7 +154,5 @@ public class HBaseStoragePlugin extends 
AbstractStoragePlugin {
     private HBaseStoragePlugin getHBaseStoragePlugin() {
       return HBaseStoragePlugin.this;
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 302ccca..0527391 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,7 +41,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 
-// Class containing information for reading a single HBase region
+/**
+ * Contains information for reading a single HBase region
+ */
+
 @JsonTypeName("hbase-region-scan")
 public class HBaseSubScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
@@ -210,12 +213,10 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
           + ", filter=" + (getScanFilter() == null ? null : 
getScanFilter().toString())
           + ", regionServer=" + regionServer + "]";
     }
-
   }
 
   @Override
   public int getOperatorType() {
     return CoreOperatorType.HBASE_SUB_SCAN_VALUE;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index e12c77c..b957347 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -38,7 +38,7 @@ import com.google.common.io.Files;
 
 public class BaseHBaseTest extends BaseTestQuery {
 
-  private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase";
+  public static final String HBASE_STORAGE_PLUGIN_NAME = "hbase";
 
   protected static Configuration conf = HBaseConfiguration.create();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index bdd2fab..c233837 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -71,35 +71,44 @@ public class PrintingResultsListener implements 
UserResultsListener {
   }
 
   @Override
+  @SuppressWarnings("resource")
   public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
     final QueryData header = result.getHeader();
     final DrillBuf data = result.getData();
 
-    if (data != null) {
-      count.addAndGet(header.getRowCount());
-      try {
-        loader.load(header.getDef(), data);
-        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-        // SchemaChangeException, so check/clean catch clause below.
-      } catch (SchemaChangeException e) {
-        submissionFailed(UserException.systemError(e).build(logger));
-      }
+    try {
+      if (data != null) {
+        count.addAndGet(header.getRowCount());
+        try {
+          loader.load(header.getDef(), data);
+          // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+          // SchemaChangeException, so check/clean catch clause below.
+        } catch (SchemaChangeException e) {
+          submissionFailed(UserException.systemError(e).build(logger));
+        }
 
-      switch(format) {
-        case TABLE:
-          VectorUtil.showVectorAccessibleContent(loader, columnWidth);
-          break;
-        case TSV:
-          VectorUtil.showVectorAccessibleContent(loader, "\t");
-          break;
-        case CSV:
-          VectorUtil.showVectorAccessibleContent(loader, ",");
-          break;
+        try {
+          switch(format) {
+            case TABLE:
+              VectorUtil.showVectorAccessibleContent(loader, columnWidth);
+              break;
+            case TSV:
+              VectorUtil.showVectorAccessibleContent(loader, "\t");
+              break;
+            case CSV:
+              VectorUtil.showVectorAccessibleContent(loader, ",");
+              break;
+            default:
+              throw new IllegalStateException(format.toString());
+          }
+        } finally {
+          loader.clear();
+        }
       }
-      loader.clear();
     }
-
-    result.release();
+    finally {
+      result.release();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index 318e6b1..c825fcb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,9 +33,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("union-exchange")
-public class UnionExchange extends AbstractExchange{
-
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionExchange.class);
+public class UnionExchange extends AbstractExchange {
 
   public UnionExchange(@JsonProperty("child") PhysicalOperator child) {
     super(child);
@@ -76,5 +74,4 @@ public class UnionExchange extends AbstractExchange{
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
     return new UnionExchange(child);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index be7d4ed..cfdc06d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -181,9 +181,6 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
         return IterOutcome.OUT_OF_MEMORY;
       }
 
-
-//      logger.debug("Next received batch {}", batch);
-
       final RecordBatchDef rbd = batch.getHeader().getDef();
       final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 649ecd9..6d4f1d7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,8 +28,8 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.batch.RawBatchBuffer;
 
 public class UnorderedReceiverCreator implements 
BatchCreator<UnorderedReceiver>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class);
 
+  @SuppressWarnings("resource")
   @Override
   public UnorderedReceiverBatch getBatch(FragmentContext context, 
UnorderedReceiver receiver, List<RecordBatch> children)
       throws ExecutionSetupException {
@@ -42,6 +42,4 @@ public class UnorderedReceiverCreator implements 
BatchCreator<UnorderedReceiver>
     RawBatchBuffer buffer = buffers[0];
     return new UnorderedReceiverBatch(context, buffer, receiver);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 33c840b..b15a843 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalProject;

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index d974bad..7f634c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 63dcdb45..564aaed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -136,6 +136,21 @@ public class BatchSchema implements 
Iterable<MaterializedField> {
     return true;
   }
 
+  public boolean isEquivalent(BatchSchema other) {
+    if (fields == null || other.fields == null) {
+      return fields == other.fields;
+    }
+    if (fields.size() != other.fields.size()) {
+      return false;
+    }
+    for (int i = 0; i < fields.size(); i++) {
+      if (! fields.get(i).isEquivalent(other.fields.get(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * We treat fields with same set of Subtypes as equal, even if they are in a 
different order
    * @param t1

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 9a8483b..20b5cb5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.record;
 
 import io.netty.buffer.DrillBuf;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.common.StackTrace;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -38,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 
 
 /**
@@ -85,7 +87,7 @@ public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapp
     // the schema has changed since the previous call.
 
     // Set up to recognize previous fields that no longer exist.
-    final Map<String, ValueVector> oldFields = Maps.newHashMap();
+    final Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap();
     for(final VectorWrapper<?> wrapper : container) {
       final ValueVector vector = wrapper.getValueVector();
       oldFields.put(vector.getField().getName(), vector);
@@ -109,6 +111,17 @@ public class RecordBatchLoader implements 
VectorAccessible, Iterable<VectorWrapp
           vector.clear();
           schemaChanged = true;
           vector = TypeHelper.getNewVector(fieldDef, allocator);
+
+        // If the field is a map, check if the map schema changed.
+
+        } else if (vector.getField().getType().getMinorType() == MinorType.MAP 
 &&
+                   ! isSameSchema(vector.getField().getChildren(), 
field.getChildList())) {
+
+          // The map schema changed. Discard the old map and create a new one.
+
+          schemaChanged = true;
+          vector.clear();
+          vector = TypeHelper.getNewVector(fieldDef, allocator);
         }
 
         // Load the vector.
@@ -121,7 +134,6 @@ public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapp
         newVectors.add(vector);
       }
 
-
       // rebuild the schema.
       final SchemaBuilder builder = BatchSchema.newBuilder();
       for (final VectorWrapper<?> v : newVectors) {
@@ -150,6 +162,51 @@ public class RecordBatchLoader implements 
VectorAccessible, Iterable<VectorWrapp
     return schemaChanged;
   }
 
+  /**
+   * Check if two schemas are the same. The schemas, given as lists, represent 
the
+   * children of the original and new maps (AKA structures.)
+   *
+   * @param currentChildren current children of a Drill map
+   * @param newChildren new children, in an incoming batch, of the same
+   * Drill map
+   * @return true if the schemas are identical, false if a child is missing
+   * or has changed type or cardinality (AKA "mode").
+   */
+
+  private boolean isSameSchema(Collection<MaterializedField> currentChildren,
+      List<SerializedField> newChildren) {
+    if (currentChildren.size() != newChildren.size()) {
+      return false;
+    }
+
+    // Column order can permute (see DRILL-5828). So, use a map
+    // for matching.
+
+    Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap();
+    for (MaterializedField currentChild : currentChildren) {
+      childMap.put(currentChild.getName(), currentChild);
+    }
+    for (SerializedField newChild : newChildren) {
+      MaterializedField currentChild = 
childMap.get(newChild.getNamePart().getName());
+
+      // New map member?
+
+      if (currentChild == null) {
+        return false;
+      }
+
+      // Changed data type?
+
+      if (! currentChild.getType().equals(newChild.getMajorType())) {
+        return false;
+      }
+    }
+
+    // Everything matches.
+
+    return true;
+  }
+
   @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
     return container.getValueVectorId(path);

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 63cac7d..abcb846 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -134,7 +134,7 @@ public class VectorContainer implements VectorAccessible {
     return addOrGet(field, null);
   }
 
-  @SuppressWarnings({ "resource", "unchecked" })
+  @SuppressWarnings("unchecked")
   public <T extends ValueVector> T addOrGet(final MaterializedField field, 
final SchemaChangeCallBack callBack) {
     final TypedFieldId id = 
getValueVectorId(SchemaPath.getSimplePath(field.getName()));
     final ValueVector vector;

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index 4de4c2a..018653b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.util;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -42,7 +43,7 @@ public class VectorUtil {
     System.out.println(rows + " row(s):");
     List<String> columns = Lists.newArrayList();
     for (VectorWrapper<?> vw : va) {
-      columns.add(vw.getValueVector().getField().getName());
+      columns.add(formatFieldSchema(vw.getValueVector().getField()));
     }
 
     int width = columns.size();
@@ -80,6 +81,14 @@ public class VectorUtil {
     }
   }
 
+  public static String formatFieldSchema(MaterializedField field) {
+    String colName = field.getName() + "<" + field.getType().getMinorType() + 
"(" + field.getType().getMode() + ")" + ">";
+    if (field.getType().getMinorType() == MinorType.MAP) {
+      colName += expandMapSchema(field);
+    }
+    return colName;
+  }
+
   public static void appendVectorAccessibleContent(VectorAccessible va, 
StringBuilder formattedResults,
       final String delimiter, boolean includeHeader) {
     if (includeHeader) {
@@ -135,9 +144,8 @@ public class VectorUtil {
       int columnWidth = getColumnWidth(columnWidths, columnIndex);
       width += columnWidth + 2;
       formats.add("| %-" + columnWidth + "s");
-      MaterializedField field = vw.getValueVector().getField();
-      columns.add(field.getName() + "<" + field.getType().getMinorType() + "(" 
+ field.getType().getMode() + ")" + ">");
       columnIndex++;
+      columns.add(formatFieldSchema(vw.getValueVector().getField()));
     }
 
     int rows = va.getRecordCount();
@@ -180,6 +188,19 @@ public class VectorUtil {
     }
   }
 
+  private static String expandMapSchema(MaterializedField mapField) {
+    StringBuilder buf = new StringBuilder();
+    buf.append("{");
+    String sep = "";
+    for (MaterializedField field : mapField.getChildren()) {
+      buf.append(sep);
+      sep = ",";
+      buf.append(formatFieldSchema(field));
+    }
+    buf.append("}");
+    return buf.toString();
+  }
+
   public static void allocateVectors(Iterable<ValueVector> valueVectors, int 
count) {
     for (final ValueVector v : valueVectors) {
       AllocationHelper.allocateNew(v, count);

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
index f4c5536..73e8de6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
@@ -183,6 +183,9 @@ public class DistributedQueryQueue implements QueryQueue {
      */
 
     public boolean isSameAs(ConfigSet otherSet) {
+      if (otherSet == null) {
+        return false;
+      }
       return queueThreshold == otherSet.queueThreshold &&
              queueTimeout == otherSet.queueTimeout &&
              largeQueueSize == otherSet.largeQueueSize &&

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
index 4febe1d..fed2914 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
@@ -18,69 +18,72 @@
 package org.apache.drill.exec.record.vector;
 
 import static org.junit.Assert.assertEquals;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.categories.VectorTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.junit.Test;
-
-import com.google.common.collect.Lists;
 import org.junit.experimental.categories.Category;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
 @Category(VectorTest.class)
 public class TestLoad extends ExecTest {
   private final DrillConfig drillConfig = DrillConfig.create();
 
+  @SuppressWarnings("resource")
   @Test
   public void testLoadValueVector() throws Exception {
     final BufferAllocator allocator = 
RootAllocatorFactory.newRoot(drillConfig);
-    final ValueVector fixedV = new IntVector(MaterializedField.create("ints",
-        Types.required(MinorType.INT)), allocator);
-    final ValueVector varlenV = new VarCharVector(MaterializedField.create(
-        "chars", Types.required(MinorType.VARCHAR)), allocator);
-    final ValueVector nullableVarlenV = new 
NullableVarCharVector(MaterializedField.create("chars",
-        Types.optional(MinorType.VARCHAR)), allocator);
-
-    final List<ValueVector> vectors = Lists.newArrayList(fixedV, varlenV, 
nullableVarlenV);
-    for (final ValueVector v : vectors) {
-      AllocationHelper.allocate(v, 100, 50);
-      v.getMutator().generateTestData(100);
-    }
+    BatchSchema schema = new SchemaBuilder()
+        .add("ints", MinorType.INT)
+        .add("chars", MinorType.VARCHAR)
+        .addNullable("chars2", MinorType.VARCHAR)
+        .build();
 
-    final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, 
vectors, false);
-    final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
-    final ByteBuf[] byteBufs = writableBatch.getBuffers();
-    int bytes = 0;
-    for (ByteBuf buf : byteBufs) {
-      bytes += buf.writerIndex();
-    }
-    final DrillBuf byteBuf = allocator.buffer(bytes);
-    int index = 0;
-    for (ByteBuf buf : byteBufs) {
-      buf.readBytes(byteBuf, index, buf.writerIndex());
-      index += buf.writerIndex();
-    }
-    byteBuf.writerIndex(bytes);
+    // Create vectors
+
+    final List<ValueVector> vectors = createVectors(allocator, schema, 100);
+
+    // Writeable batch now owns vector buffers
 
+     final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, 
vectors, false);
+
+     // Serialize the vectors
+
+    final DrillBuf byteBuf = serializeBatch(allocator, writableBatch);
+
+    // Batch loader does NOT take ownership of the serialized buffer
+
+    final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
     batchLoader.load(writableBatch.getDef(), byteBuf);
+
+    // Release the serialized buffer.
+
+    byteBuf.release();
+
+    // TODO: Replace this with actual validation, not just dumping to the 
console.
+
     boolean firstColumn = true;
     int recordCount = 0;
     for (final VectorWrapper<?> v : batchLoader) {
@@ -122,7 +125,294 @@ public class TestLoad extends ExecTest {
       }
     }
     assertEquals(100, recordCount);
+
+    // Free the original vectors
+
+    writableBatch.clear();
+
+    // Free the deserialized vectors
+
     batchLoader.clear();
+
+    // The allocator will verify that the frees were done correctly.
+
+    allocator.close();
+  }
+
+  // TODO: Replace this low-level code with RowSet usage once
+  // DRILL-5657 is committed to master.
+
+  private static List<ValueVector> createVectors(BufferAllocator allocator, 
BatchSchema schema, int i) {
+    final List<ValueVector> vectors = new ArrayList<>();
+    for (MaterializedField field : schema) {
+      @SuppressWarnings("resource")
+      ValueVector v = TypeHelper.getNewVector(field, allocator);
+      AllocationHelper.allocate(v, 100, 50);
+      v.getMutator().generateTestData(100);
+      vectors.add(v);
+    }
+    return vectors;
+  }
+
+  private static DrillBuf serializeBatch(BufferAllocator allocator, 
WritableBatch writableBatch) {
+    final ByteBuf[] byteBufs = writableBatch.getBuffers();
+    int bytes = 0;
+    for (ByteBuf buf : byteBufs) {
+      bytes += buf.writerIndex();
+    }
+    final DrillBuf byteBuf = allocator.buffer(bytes);
+    int index = 0;
+    for (ByteBuf buf : byteBufs) {
+      buf.readBytes(byteBuf, index, buf.writerIndex());
+      index += buf.writerIndex();
+    }
+    byteBuf.writerIndex(bytes);
+    return byteBuf;
+  }
+
+  /**
+   * Test function to simulate loading a batch.
+   *
+   * @param allocator a memory allocator
+   * @param batchLoader the batch loader under test
+   * @param schema the schema of the new batch
+   * @return false if the same schema, true if schema changed;
+   * that is, whether the schema changed
+   * @throws SchemaChangeException should not occur
+   */
+
+  @SuppressWarnings("resource")
+  private boolean loadBatch(BufferAllocator allocator,
+      final RecordBatchLoader batchLoader,
+      BatchSchema schema) throws SchemaChangeException {
+    final List<ValueVector> vectors = createVectors(allocator, schema, 100);
+    final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, 
vectors, false);
+    final DrillBuf byteBuf = serializeBatch(allocator, writableBatch);
+    boolean result = batchLoader.load(writableBatch.getDef(), byteBuf);
+    byteBuf.release();
     writableBatch.clear();
+    return result;
+  }
+
+  @Test
+  public void testSchemaChange() throws SchemaChangeException {
+    final BufferAllocator allocator = 
RootAllocatorFactory.newRoot(drillConfig);
+    final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
+
+    // Initial schema: a: INT, b: VARCHAR
+    // Schema change: N/A
+
+    BatchSchema schema1 = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .build();
+    {
+      assertTrue(loadBatch(allocator, batchLoader, schema1));
+      assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Same schema
+    // Schema change: No
+
+    {
+      assertFalse(loadBatch(allocator, batchLoader, schema1));
+      assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Reverse columns: b: VARCHAR, a: INT
+    // Schema change: No
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("b", MinorType.VARCHAR)
+          .add("a", MinorType.INT)
+          .build();
+      assertFalse(loadBatch(allocator, batchLoader, schema));
+
+      // Potential bug: see DRILL-5828
+
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Drop a column: a: INT
+    // Schema change: Yes
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Add a column: a: INT, b: VARCHAR, c: INT
+    // Schema change: Yes
+
+    {
+      assertTrue(loadBatch(allocator, batchLoader, schema1));
+      assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .add("b", MinorType.VARCHAR)
+          .add("c", MinorType.INT)
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Change a column type: a: INT, b: VARCHAR, c: VARCHAR
+    // Schema change: Yes
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .add("b", MinorType.VARCHAR)
+          .add("c", MinorType.VARCHAR)
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Empty schema
+    // Schema change: Yes
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    batchLoader.clear();
+    allocator.close();
+  }
+
+  @Test
+  public void testMapSchemaChange() throws SchemaChangeException {
+    final BufferAllocator allocator = 
RootAllocatorFactory.newRoot(drillConfig);
+    final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
+
+    // Initial schema: a: INT, m: MAP{}
+
+    BatchSchema schema1 = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .buildMap()
+        .build();
+    {
+      assertTrue(loadBatch(allocator, batchLoader, schema1));
+      assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Same schema
+    // Schema change: No
+
+    {
+      assertFalse(loadBatch(allocator, batchLoader, schema1));
+      assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Add column to map: a: INT, m: MAP{b: VARCHAR}
+    // Schema change: Yes
+
+    BatchSchema schema2 = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("b", MinorType.VARCHAR)
+          .buildMap()
+        .build();
+    {
+      assertTrue(loadBatch(allocator, batchLoader, schema2));
+      assertTrue(schema2.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Same schema
+    // Schema change: No
+
+    {
+      assertFalse(loadBatch(allocator, batchLoader, schema2));
+      assertTrue(schema2.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Add column:  a: INT, m: MAP{b: VARCHAR, c: INT}
+    // Schema change: Yes
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .addMap("m")
+            .add("b", MinorType.VARCHAR)
+            .add("c", MinorType.INT)
+            .buildMap()
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Drop a column:  a: INT, m: MAP{b: VARCHAR}
+    // Schema change: Yes
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .addMap("m")
+            .add("b", MinorType.VARCHAR)
+            .buildMap()
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Change type:  a: INT, m: MAP{b: INT}
+    // Schema change: Yes
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .addMap("m")
+            .add("b", MinorType.INT)
+            .buildMap()
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Empty map: a: INT, m: MAP{}
+
+    {
+      assertTrue(loadBatch(allocator, batchLoader, schema1));
+      assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    // Drop map: a: INT
+
+    {
+      BatchSchema schema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build();
+      assertTrue(loadBatch(allocator, batchLoader, schema));
+      assertTrue(schema.isEquivalent(batchLoader.getSchema()));
+      batchLoader.getContainer().zeroVectors();
+    }
+
+    batchLoader.clear();
+    allocator.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 37fcdfd..e7bf61f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -345,6 +345,10 @@ public class QueryBuilder {
     }
   }
 
+  public QueryRowSetIterator rowSetIterator( ) {
+    return new QueryRowSetIterator(client.allocator(), withEventListener());
+  }
+
   /**
    * Run the query that is expected to return (at least) one row
    * with the only (or first) column returning a long value.

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
new file mode 100644
index 0000000..c329690
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+import org.apache.drill.test.rowSet.DirectRowSet;
+
+public class QueryRowSetIterator implements Iterator<DirectRowSet>, 
Iterable<DirectRowSet> {
+  private final BufferingQueryEventListener listener;
+  private int recordCount = 0;
+  private int batchCount = 0;
+  QueryId queryId = null;
+  private BufferAllocator allocator;
+  private QueryDataBatch batch;
+  private QueryState state;
+
+  QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener 
listener) {
+    this.allocator = allocator;
+    this.listener = listener;
+  }
+
+  public QueryId queryId() { return queryId; }
+  public String queryIdString() { return QueryIdHelper.getQueryId(queryId); }
+  public QueryState finalState() { return state; }
+  public int batchCount() { return batchCount; }
+  public int rowCount() { return recordCount; }
+
+  @Override
+  public boolean hasNext() {
+    for ( ; ; ) {
+      QueryEvent event = listener.get();
+      state = event.state;
+      batch = null;
+      switch (event.type)
+      {
+      case BATCH:
+        batchCount++;
+        recordCount += event.batch.getHeader().getRowCount();
+        batch = event.batch;
+        return true;
+      case EOF:
+        state = event.state;
+        return false;
+      case ERROR:
+        throw new RuntimeException(event.error);
+      case QUERY_ID:
+        queryId = event.queryId;
+        break;
+      default:
+        throw new IllegalStateException("Unexpected event: " + event.type);
+      }
+    }
+  }
+
+  @Override
+  public DirectRowSet next() {
+
+    if (batch == null) {
+      throw new IllegalStateException();
+    }
+
+    // Unload the batch and convert to a row set.
+
+    final RecordBatchLoader loader = new RecordBatchLoader(allocator);
+    try {
+      loader.load(batch.getHeader().getDef(), batch.getData());
+      batch.release();
+      batch = null;
+      VectorContainer container = loader.getContainer();
+      container.setRecordCount(loader.getRecordCount());
+      return new DirectRowSet(allocator, container);
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public void printAll() {
+    for (DirectRowSet rowSet : this) {
+      rowSet.print();
+      rowSet.clear();
+    }
+  }
+
+  @Override
+  public Iterator<DirectRowSet> iterator() {
+    return this;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java 
b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index a9feafd..5029c56 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -52,7 +52,7 @@ public final class DrillBuf extends AbstractByteBuf 
implements AutoCloseable {
   private final int offset;
   private final BufferLedger ledger;
   private final BufferManager bufManager;
-  private final ByteBufAllocator alloc;
+//  private final ByteBufAllocator alloc;
   private final boolean isEmpty;
   private volatile int length;
   private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
@@ -72,7 +72,7 @@ public final class DrillBuf extends AbstractByteBuf 
implements AutoCloseable {
     this.udle = byteBuf;
     this.isEmpty = isEmpty;
     this.bufManager = manager;
-    this.alloc = alloc;
+//    this.alloc = alloc;
     this.addr = byteBuf.memoryAddress() + offset;
     this.ledger = ledger;
     this.length = length;

http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java 
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index e2b44a7..4d29d55 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.exec.proto.UserBitShared.NamePart;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
@@ -86,6 +87,7 @@ public class MaterializedField {
     children.add(field);
   }
 
+  @Override
   public MaterializedField clone() {
     return withPathAndType(name, getType());
   }
@@ -168,6 +170,57 @@ public class MaterializedField {
             Objects.equals(this.type, other.type);
   }
 
+  public boolean isEquivalent(MaterializedField other) {
+    if (! name.equalsIgnoreCase(other.name)) {
+      return false;
+    }
+
+    // Requires full type equality, including fields such as precision and 
scale.
+    // But, unset fields are equivalent to 0. Can't use the protobuf-provided
+    // isEquals(), that treats set and unset fields as different.
+
+    if (type.getMinorType() != other.type.getMinorType()) {
+      return false;
+    }
+    if (type.getMode() != other.type.getMode()) {
+      return false;
+    }
+    if (type.getScale() != other.type.getScale()) {
+      return false;
+    }
+    if (type.getPrecision() != other.type.getPrecision()) {
+      return false;
+    }
+
+    // Compare children -- but only for maps, not the internal children
+    // for Varchar, repeated or nullable types.
+
+    if (type.getMinorType() != MinorType.MAP) {
+      return true;
+    }
+
+    if (children == null  ||  other.children == null) {
+      return children == other.children;
+    }
+    if (children.size() != other.children.size()) {
+      return false;
+    }
+
+    // Maps are name-based, not position. But, for our
+    // purposes, we insist on identical ordering.
+
+    Iterator<MaterializedField> thisIter = children.iterator();
+    Iterator<MaterializedField> otherIter = other.children.iterator();
+    while (thisIter.hasNext()) {
+      MaterializedField thisChild = thisIter.next();
+      MaterializedField otherChild = otherIter.next();
+      if (! thisChild.isEquivalent(otherChild)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * <p>Creates materialized field string representation.
    * Includes field name, its type with precision and scale if any and data 
mode.

Reply via email to