This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 0928427  DRILL-7418: MetadataDirectGroupScan improvements
0928427 is described below

commit 09284270b85d35f7a1d521539c42876bb613a6e3
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
AuthorDate: Tue Oct 22 17:33:04 2019 +0300

    DRILL-7418: MetadataDirectGroupScan improvements
    
    1. Replaced files listing with selection root information to reduce query 
plan size in MetadataDirectGroupScan.
    2. Fixed MetadataDirectGroupScan ser / de issues.
    3. Added PlanMatcher to QueryBuilder for more convenient plan matching.
    4. Re-written TestConvertCountToDirectScan to use ClusterTest.
    5. Refactoring and code clean up.
---
 .../exec/TestHiveDrillNativeParquetReader.java     |  19 +-
 .../exec/physical/base/AbstractGroupScan.java      |   5 +
 .../apache/drill/exec/physical/base/GroupScan.java |   9 +-
 .../apache/drill/exec/physical/base/ScanStats.java |  46 ++-
 .../logical/ConvertCountToDirectScanRule.java      |  27 +-
 .../physical/ConvertCountToDirectScanPrule.java    |  17 +-
 .../drill/exec/store/direct/DirectGroupScan.java   |  24 +-
 .../exec/store/direct/MetadataDirectGroupScan.java |  49 ++-
 .../parquet/AbstractParquetScanBatchCreator.java   |   6 +-
 .../drill/TestFunctionsWithTypeExpoQueries.java    |   2 +-
 .../logical/TestConvertCountToDirectScan.java      | 444 ++++++++++++---------
 .../java/org/apache/drill/test/ClientFixture.java  |  76 ++--
 .../java/org/apache/drill/test/QueryBuilder.java   | 152 +++++--
 13 files changed, 530 insertions(+), 346 deletions(-)

diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index 5490640..6b9a7cd 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -64,8 +64,7 @@ public class TestHiveDrillNativeParquetReader extends 
HiveTestBase {
     int actualRowCount = testSql(query);
     assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
 
-    testPlanMatchingPatterns(query,
-        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+    testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", 
"numFiles=1");
   }
 
   @Test
@@ -75,8 +74,7 @@ public class TestHiveDrillNativeParquetReader extends 
HiveTestBase {
     int actualRowCount = testSql(query);
     assertEquals("Expected and actual row count should match", 1, 
actualRowCount);
 
-    testPlanMatchingPatterns(query,
-        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+    testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", 
"numFiles=1");
   }
 
   @Test
@@ -114,15 +112,14 @@ public class TestHiveDrillNativeParquetReader extends 
HiveTestBase {
     int actualRowCount = testSql(query);
     assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
 
-    testPlanMatchingPatterns(query,
-        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+    testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", 
"numFiles=1");
   }
 
   @Test
   public void testPartitionedExternalTable() throws Exception {
     String query = "select * from hive.kv_native_ext";
 
-    testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", 
"numFiles=2"}, null);
+    testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", 
"numFiles=2");
 
     testBuilder()
         .sqlQuery(query)
@@ -185,14 +182,16 @@ public class TestHiveDrillNativeParquetReader extends 
HiveTestBase {
     int actualRowCount = testSql(query);
     assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
 
-    testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", 
"numFiles=1"}, null);
+    testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", 
"numFiles=1");
   }
 
   @Test
   public void testConvertCountToDirectScanOptimization() throws Exception {
     String query = "select count(1) as cnt from hive.kv_native";
 
-    testPlanMatchingPatterns(query, new String[]{"DynamicPojoRecordReader"}, 
null);
+    testPlanMatchingPatterns(query, "DynamicPojoRecordReader");
+
+    testPhysicalPlanExecutionBasedOnQuery(query);
 
     testBuilder()
       .sqlQuery(query)
@@ -224,7 +223,7 @@ public class TestHiveDrillNativeParquetReader extends 
HiveTestBase {
   public void testReadAllSupportedHiveDataTypesNativeParquet() throws 
Exception {
     String query = "select * from hive.readtest_parquet";
 
-    testPlanMatchingPatterns(query, new String[] 
{"HiveDrillNativeParquetScan"}, null);
+    testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan");
 
     testBuilder()
         .sqlQuery(query)
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 4916370..bc21550 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -174,6 +174,11 @@ public abstract class AbstractGroupScan extends 
AbstractBase implements GroupSca
   }
 
   @Override
+  public Path getSelectionRoot() {
+    return null;
+  }
+
+  @Override
   public Collection<Path> getFiles() {
     return null;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 1ba1dd9..ebbb717 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -141,8 +141,15 @@ public interface GroupScan extends Scan, HasAffinity {
   boolean hasFiles();
 
   /**
+   * Returns path to the selection root. If this GroupScan cannot provide 
selection root, it returns null.
+   *
+   * @return path to the selection root
+   */
+  Path getSelectionRoot();
+
+  /**
    * Returns a collection of file names associated with this GroupScan. This 
should be called after checking
-   * hasFiles().  If this GroupScan cannot provide file names, it returns null.
+   * hasFiles(). If this GroupScan cannot provide file names, it returns null.
    *
    * @return collection of files paths
    */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
index 721f723..596dc5b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
@@ -17,22 +17,44 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 public class ScanStats {
 
   public static final ScanStats TRIVIAL_TABLE = new 
ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1);
 
   public static final ScanStats ZERO_RECORD_TABLE = new 
ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 0, 1, 1);
 
+  @JsonProperty
+  private final GroupScanProperty groupScanProperty;
+  @JsonProperty
   private final double recordCount;
+  @JsonProperty
   private final double cpuCost;
+  @JsonProperty
   private final double diskCost;
-  private final GroupScanProperty property;
 
-  public ScanStats(GroupScanProperty property, double recordCount, double 
cpuCost, double diskCost) {
+  @JsonCreator
+  public ScanStats(@JsonProperty("groupScanProperty") GroupScanProperty 
groupScanProperty,
+                   @JsonProperty("recordCount") double recordCount,
+                   @JsonProperty("cpuCost") double cpuCost,
+                   @JsonProperty("diskCost") double diskCost) {
+    this.groupScanProperty = groupScanProperty;
     this.recordCount = recordCount;
     this.cpuCost = cpuCost;
     this.diskCost = diskCost;
-    this.property = property;
+  }
+
+  /**
+   * Return if GroupScan knows the exact row count in the result of getSize() 
call.
+   * By default, group scan does not know the exact row count, before it scans 
every rows.
+   * Currently, parquet group scan will return the exact row count.
+   *
+   * @return group scan property
+   */
+  public GroupScanProperty getGroupScanProperty() {
+    return groupScanProperty;
   }
 
   public double getRecordCount() {
@@ -49,20 +71,14 @@ public class ScanStats {
 
   @Override
   public String toString() {
-    return "ScanStats{" + "recordCount=" + recordCount + ", cpuCost=" + 
cpuCost + ", diskCost=" + diskCost + ", property=" + property + '}';
+    return "ScanStats{" +
+      "recordCount=" + recordCount +
+      ", cpuCost=" + cpuCost +
+      ", diskCost=" + diskCost +
+      ", groupScanProperty=" + groupScanProperty +
+      '}';
   }
 
-  /**
-   * Return if GroupScan knows the exact row count in the result of getSize() 
call.
-   * By default, groupscan does not know the exact row count, before it scans 
every rows.
-   * Currently, parquet group scan will return the exact row count.
-   */
-  public GroupScanProperty getGroupScanProperty() {
-    return property;
-  }
-
-
-
   public enum GroupScanProperty {
     NO_EXACT_ROW_COUNT(false, false),
     EXACT_ROW_COUNT(true, true);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
index fb1bd2f..28c23f0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
@@ -48,14 +48,14 @@ import 
org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
 import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.Set;
@@ -99,9 +99,9 @@ public class ConvertCountToDirectScanRule extends RelOptRule {
       RelOptHelper.some(Aggregate.class,
                             RelOptHelper.any(TableScan.class)), "Agg_on_scan");
 
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
 
-  protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
+  private ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
     super(rule, "ConvertCountToDirectScanRule:" + id);
   }
 
@@ -153,7 +153,7 @@ public class ConvertCountToDirectScanRule extends 
RelOptRule {
 
     Metadata_V4.MetadataSummary metadataSummary = status.getRight();
     Map<String, Long> result = collectCounts(settings, metadataSummary, agg, 
scan, project);
-    logger.trace("Calculated the following aggregate counts: ", result);
+    logger.trace("Calculated the following aggregate counts: {}", result);
 
     // if counts could not be determined, rule won't be applied
     if (result.isEmpty()) {
@@ -161,17 +161,16 @@ public class ConvertCountToDirectScanRule extends 
RelOptRule {
       return;
     }
 
-    List<Path> fileList =
-            
ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot()));
+    Path summaryFileName = 
Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot());
 
     final RelDataType scanRowType = 
CountToDirectScanUtils.constructDataType(agg, result.keySet());
 
     final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
         CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
-        Collections.singletonList((List<Long>) new 
ArrayList<>(result.values())));
+        Collections.singletonList(new ArrayList<>(result.values())));
 
     final ScanStats scanStats = new 
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, 
scanRowType.getFieldCount());
-    final MetadataDirectGroupScan directScan = new 
MetadataDirectGroupScan(reader, fileList, scanStats, true);
+    final MetadataDirectGroupScan directScan = new 
MetadataDirectGroupScan(reader, summaryFileName, 1, scanStats, true);
 
     final DrillDirectScanRel newScan = new 
DrillDirectScanRel(scan.getCluster(), 
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
       directScan, scanRowType);
@@ -190,16 +189,16 @@ public class ConvertCountToDirectScanRule extends 
RelOptRule {
     if (!((formatConfig instanceof ParquetFormatConfig)
       || ((formatConfig instanceof NamedFormatPluginConfig)
       && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
-      return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, 
null);
+      return new ImmutablePair<>(false, null);
     }
 
     FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
-    DrillFileSystem fs = null;
+    DrillFileSystem fs;
     try {
        fs = new 
DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
     } catch (IOException e) {
       logger.warn("Unable to create the file system object for retrieving 
statistics from metadata cache file ", e);
-      return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, 
null);
+      return new ImmutablePair<>(false, null);
     }
 
     // check if the cacheFileRoot has been set: this is needed because after 
directory pruning, the
@@ -215,8 +214,7 @@ public class ConvertCountToDirectScanRule extends 
RelOptRule {
 
     Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs, 
selectionRoot, false, parquetReaderConfig);
 
-    return metadataSummary != null ? new ImmutablePair<Boolean, 
Metadata_V4.MetadataSummary>(true, metadataSummary) :
-      new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null);
+    return metadataSummary != null ? new ImmutablePair<>(true, 
metadataSummary) : new ImmutablePair<>(false, null);
   }
 
   /**
@@ -311,5 +309,4 @@ public class ConvertCountToDirectScanRule extends 
RelOptRule {
 
     return ImmutableMap.copyOf(result);
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
index ac8f3ca..0900ba7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import java.util.List;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -44,6 +43,8 @@ import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.direct.MetadataDirectGroupScan;
 import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -89,9 +90,9 @@ public class ConvertCountToDirectScanPrule extends Prule {
       RelOptHelper.some(DrillAggregateRel.class,
                             RelOptHelper.any(DrillScanRel.class)), 
"Agg_on_scan");
 
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanPrule.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(ConvertCountToDirectScanPrule.class);
 
-  protected ConvertCountToDirectScanPrule(RelOptRuleOperand rule, String id) {
+  private ConvertCountToDirectScanPrule(RelOptRuleOperand rule, String id) {
     super(rule, "ConvertCountToDirectScanPrule:" + id);
   }
 
@@ -125,10 +126,11 @@ public class ConvertCountToDirectScanPrule extends Prule {
 
     final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
         CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
-        Collections.singletonList((List<Long>) new 
ArrayList<>(result.values())));
+        Collections.singletonList(new ArrayList<>(result.values())));
 
     final ScanStats scanStats = new 
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, 
scanRowType.getFieldCount());
-    final GroupScan directScan = new MetadataDirectGroupScan(reader, 
oldGrpScan.getFiles(), scanStats, false);
+    final int numFiles = oldGrpScan.hasFiles() ? oldGrpScan.getFiles().size() 
: -1;
+    final GroupScan directScan = new MetadataDirectGroupScan(reader, 
oldGrpScan.getSelectionRoot(), numFiles, scanStats, false);
 
     final DirectScanPrel newScan = DirectScanPrel.create(scan, 
scan.getTraitSet().plus(Prel.DRILL_PHYSICAL)
         .plus(DrillDistributionTrait.SINGLETON), directScan, scanRowType);
@@ -145,7 +147,7 @@ public class ConvertCountToDirectScanPrule extends Prule {
    *
    * For each aggregate call will determine if count can be calculated. 
Collects counts only for COUNT function.
    * For star, not null expressions and implicit columns sets count to total 
record number.
-   * For other cases obtains counts from group scan operator. Also count can 
not be calculated for parition columns.
+   * For other cases obtains counts from group scan operator. Also count can 
not be calculated for partition columns.
    *
    * @param agg aggregate relational expression
    * @param scan scan relational expression
@@ -155,7 +157,7 @@ public class ConvertCountToDirectScanPrule extends Prule {
   private Map<String, Long> collectCounts(PlannerSettings settings, 
DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) {
     final Set<String> implicitColumnsNames = 
ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet();
     final GroupScan oldGrpScan = scan.getGroupScan();
-    final long totalRecordCount = 
(long)oldGrpScan.getScanStats(settings).getRecordCount();
+    final long totalRecordCount = (long) 
oldGrpScan.getScanStats(settings).getRecordCount();
     final LinkedHashMap<String, Long> result = new LinkedHashMap<>();
 
     for (int i = 0; i < agg.getAggCallList().size(); i++) {
@@ -218,5 +220,4 @@ public class ConvertCountToDirectScanPrule extends Prule {
 
     return ImmutableMap.copyOf(result);
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 67b2e5c..6c49943 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -17,10 +17,11 @@
  */
 package org.apache.drill.exec.store.direct;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -34,26 +35,30 @@ import java.util.List;
 @JsonTypeName("direct-scan")
 public class DirectGroupScan extends AbstractGroupScan {
 
+  @JsonProperty
   protected final RecordReader reader;
+  @JsonProperty
   protected final ScanStats stats;
 
   public DirectGroupScan(RecordReader reader) {
     this(reader, ScanStats.TRIVIAL_TABLE);
   }
 
-  public DirectGroupScan(RecordReader reader, ScanStats stats) {
+  @JsonCreator
+  public DirectGroupScan(@JsonProperty("reader") RecordReader reader,
+                         @JsonProperty("stats") ScanStats stats) {
     super((String) null);
     this.reader = reader;
     this.stats = stats;
   }
 
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws 
PhysicalOperatorSetupException {
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
     assert endpoints.size() == 1;
   }
 
   @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws 
ExecutionSetupException {
+  public SubScan getSpecificScan(int minorFragmentId) {
     assert minorFragmentId == 0;
     return new DirectSubScan(reader);
   }
@@ -68,8 +73,14 @@ public class DirectGroupScan extends AbstractGroupScan {
     return stats;
   }
 
+  @JsonIgnore
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
+  public List<SchemaPath> getColumns() {
+    return super.getColumns();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     assert children == null || children.isEmpty();
     return new DirectGroupScan(reader, stats);
   }
@@ -83,5 +94,4 @@ public class DirectGroupScan extends AbstractGroupScan {
   public GroupScan clone(List<SchemaPath> columns) {
     return this;
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
index 4fea456..da63f80 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.direct;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
@@ -25,7 +27,6 @@ import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.hadoop.fs.Path;
 
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -37,20 +38,34 @@ import java.util.List;
 @JsonTypeName("metadata-direct-scan")
 public class MetadataDirectGroupScan extends DirectGroupScan {
 
-  private final Collection<Path> files;
-  private boolean usedMetadataSummaryFile = false;
+  @JsonProperty
+  private final Path selectionRoot;
+  @JsonProperty
+  private final int numFiles;
+  @JsonProperty
+  private boolean usedMetadataSummaryFile;
 
-  public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files, 
ScanStats stats,
-                                 boolean usedMetadataSummaryFile) {
+  @JsonCreator
+  public MetadataDirectGroupScan(@JsonProperty("reader") RecordReader reader,
+                                 @JsonProperty("selectionRoot") Path 
selectionRoot,
+                                 @JsonProperty("numFiles") int numFiles,
+                                 @JsonProperty("stats") ScanStats stats,
+                                 @JsonProperty("usedMetadataSummaryFile") 
boolean usedMetadataSummaryFile) {
     super(reader, stats);
-    this.files = files;
+    this.selectionRoot = selectionRoot;
+    this.numFiles = numFiles;
     this.usedMetadataSummaryFile = usedMetadataSummaryFile;
   }
 
   @Override
+  public Path getSelectionRoot() {
+    return selectionRoot;
+  }
+
+  @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     assert children == null || children.isEmpty();
-    return new MetadataDirectGroupScan(reader, files, stats, 
usedMetadataSummaryFile);
+    return new MetadataDirectGroupScan(reader, selectionRoot, numFiles, stats, 
usedMetadataSummaryFile);
   }
 
   @Override
@@ -61,25 +76,25 @@ public class MetadataDirectGroupScan extends 
DirectGroupScan {
   /**
    * <p>
    * Returns string representation of group scan data.
-   * Includes list of files if present.
+   * Includes selection root, number of files, if metadata summary file was 
used,
+   * such data is present.
    * </p>
    *
    * <p>
-   * Example: [files = [/tmp/0_0_0.parquet], numFiles = 1]
+   * Example: [selectionRoot = [/tmp/users], numFiles = 1, 
usedMetadataSummaryFile = false]
    * </p>
    *
    * @return string representation of group scan data
    */
   @Override
   public String getDigest() {
-    if (files != null) {
-      StringBuilder builder = new StringBuilder();
-      builder.append("files = ").append(files).append(", ");
-      builder.append("numFiles = ").append(files.size()).append(", ");
-      builder.append("usedMetadataSummaryFile = 
").append(usedMetadataSummaryFile).append(", ");
-      return builder.append(super.getDigest()).toString();
+    StringBuilder builder = new StringBuilder();
+    if (selectionRoot != null) {
+      builder.append("selectionRoot = ").append(selectionRoot).append(", ");
     }
-    return super.getDigest();
+    builder.append("numFiles = ").append(numFiles).append(", ");
+    builder.append("usedMetadataSummaryFile = 
").append(usedMetadataSummaryFile).append(", ");
+    builder.append(super.getDigest());
+    return builder.toString();
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index eae151e..838180d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -273,8 +273,8 @@ public abstract class AbstractParquetScanBatchCreator {
    * @param rowGroup create a reader for this specific row group
    * @param fs file system
    * @param footer this file's footer
-   * // @param readSchemaOnly - if true sets the number of rows to read to be 
zero
-   * @return the (possibly modified) input  mapWithMaxColumns
+   * @param readSchemaOnly if true sets the number of rows to read to be zero
+   * @return the (possibly modified) input mapWithMaxColumns
    */
   private Map<String, String> 
createReaderAndImplicitColumns(ExecutorFragmentContext context,
                                                              
AbstractParquetRowGroupScan rowGroupScan,
@@ -347,7 +347,7 @@ public abstract class AbstractParquetScanBatchCreator {
   /**
    * Helper class responsible for creating and managing DrillFileSystem.
    */
-  protected abstract class AbstractDrillFileSystemManager {
+  protected static abstract class AbstractDrillFileSystemManager {
 
     protected final OperatorContext operatorContext;
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
 
b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
index 207638d..0c892da 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
@@ -277,7 +277,7 @@ public class TestFunctionsWithTypeExpoQueries extends 
BaseTestQuery {
         "where concat(a, 'asdf') = 'asdf'";
 
     // Validate the plan
-    final String[] expectedPlan = {"Scan.*a.parquet.*numFiles = 1"};
+    final String[] expectedPlan = {"Scan.*metadata_caching.*numFiles = 1"};
     final String[] excludedPlan = {"Filter"};
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
index c35ab2d..ae33f0a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
@@ -17,364 +17,426 @@
  */
 package org.apache.drill.exec.planner.logical;
 
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.PlannerTest;
+import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.nio.file.Paths;
 
-@Category(PlannerTest.class)
-public class TestConvertCountToDirectScan extends PlanTestBase {
+import static org.junit.Assert.assertEquals;
+
+@Category({PlannerTest.class, UnlikelyTest.class})
+public class TestConvertCountToDirectScan extends ClusterTest {
 
   @BeforeClass
-  public static void setupTestFiles() {
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
     dirTestWatcher.copyResourceToRoot(Paths.get("directcount.parquet"));
+    startCluster(builder);
   }
 
   @Test
-  public void ensureCaseDoesNotConvertToDirectScan() throws Exception {
-    testPlanMatchingPatterns(
-        "select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then 
n_nationkey else null end) as cnt\n" +
-            "from dfs.`directcount.parquet`", new String[]{"CASE"});
+  public void testCaseDoesNotConvertToDirectScan() throws Exception {
+    queryBuilder()
+      .sql("select " +
+      "count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey 
else null end) as cnt " +
+      "from dfs.`directcount.parquet`")
+      .planMatcher()
+      .include("CASE")
+      .match();
   }
 
   @Test
-  public void ensureConvertSimpleCountToDirectScan() throws Exception {
+  public void testConvertSimpleCountToDirectScan() throws Exception {
     String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
-    testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("DynamicPojoRecordReader")
+      .match();
 
     testBuilder()
-        .sqlQuery(sql)
-        .unOrdered()
-        .baselineColumns("cnt")
-        .baselineValues(25L)
-        .go();
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(25L)
+      .go();
   }
 
   @Test
-  public void ensureConvertSimpleCountConstToDirectScan() throws Exception {
+  public void testConvertSimpleCountConstToDirectScan() throws Exception {
     String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`";
-    testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("DynamicPojoRecordReader")
+      .match();
 
     testBuilder()
-        .sqlQuery(sql)
-        .unOrdered()
-        .baselineColumns("cnt")
-        .baselineValues(25L)
-        .go();
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(25L)
+      .go();
   }
 
   @Test
-  public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception 
{
+  public void testConvertSimpleCountConstExprToDirectScan() throws Exception {
     String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`";
-    testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("DynamicPojoRecordReader")
+      .match();
 
     testBuilder()
-        .sqlQuery(sql)
-        .unOrdered()
-        .baselineColumns("cnt")
-        .baselineValues(25L)
-        .go();
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(25L)
+      .go();
   }
 
   @Test
-  public void ensureDoesNotConvertForDirectoryColumns() throws Exception {
+  public void testDoesNotConvertForDirectoryColumns() throws Exception {
     String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`";
-    testPlanMatchingPatterns(sql, new String[]{"ParquetGroupScan"});
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("ParquetGroupScan")
+      .match();
 
     testBuilder()
-        .sqlQuery(sql)
-        .unOrdered()
-        .baselineColumns("cnt")
-        .baselineValues(0L)
-        .go();
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(0L)
+      .go();
   }
 
   @Test
-  public void ensureConvertForImplicitColumns() throws Exception {
+  public void testConvertForImplicitColumns() throws Exception {
     String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`";
-    testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("DynamicPojoRecordReader")
+      .match();
 
     testBuilder()
-        .sqlQuery(sql)
-        .unOrdered()
-        .baselineColumns("cnt")
-        .baselineValues(25L)
-        .go();
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(25L)
+      .go();
   }
 
   @Test
   public void ensureConvertForSeveralColumns() throws Exception {
-    test("use dfs.tmp");
+    run("use dfs.tmp");
     String tableName = "parquet_table_counts";
 
     try {
       String newFqnColumnName = "new_fqn";
-      test("alter session set `%s` = '%s'", 
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName);
-      test("create table %s as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
-      test("refresh table metadata %s", tableName);
+      client.alterSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, 
newFqnColumnName);
+      run("create table %s as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select\n" +
-          "count(%s) as implicit_count,\n" +
-          "count(*) as star_count,\n" +
-          "count(col_int) as int_column_count,\n" +
-          "count(col_vrchr) as vrchr_column_count\n" +
-          "from %s", newFqnColumnName, tableName);
-
-      testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+        "count(%s) as implicit_count,\n" +
+        "count(*) as star_count,\n" +
+        "count(col_int) as int_column_count,\n" +
+        "count(col_vrchr) as vrchr_column_count\n" +
+        "from %s", newFqnColumnName, tableName);
+
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-          .sqlQuery(sql)
-          .unOrdered()
-          .baselineColumns("implicit_count", "star_count", "int_column_count", 
"vrchr_column_count")
-          .baselineValues(6L, 6L, 2L, 3L)
-          .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("implicit_count", "star_count", "int_column_count", 
"vrchr_column_count")
+        .baselineValues(6L, 6L, 2L, 3L)
+        .go();
 
     } finally {
-      test("alter session reset `%s`", 
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL);
-      test("drop table if exists %s", tableName);
+      client.resetSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
-  public void ensureCorrectCountWithMissingStatistics() throws Exception {
-    test("use dfs.tmp");
+  public void testCorrectCountWithMissingStatistics() throws Exception {
+    run("use dfs.tmp");
     String tableName = "wide_str_table";
     try {
       // table will contain two partitions: one - with null value, second - 
with non null value
-      test("create table %s partition by (col_str) as select * from 
cp.`parquet/wide_string.parquet`", tableName);
+      run("create table %s partition by (col_str) as select * from 
cp.`parquet/wide_string.parquet`", tableName);
 
-      String query = String.format("select count(col_str) as cnt_str, count(*) 
as cnt_total from %s", tableName);
+      String sql = String.format("select count(col_str) as cnt_str, count(*) 
as cnt_total from %s", tableName);
 
       // direct scan should not be applied since we don't have statistics
-      testPlanMatchingPatterns(query, null, new 
String[]{"DynamicPojoRecordReader"});
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .exclude("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-        .sqlQuery(query)
+        .sqlQuery(sql)
         .unOrdered()
         .baselineColumns("cnt_str", "cnt_total")
         .baselineValues(1L, 2L)
         .go();
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
   public void testCountsWithMetadataCacheSummary() throws Exception {
-    test("use dfs.tmp");
+    run("use dfs.tmp");
+
     String tableName = "parquet_table_counts";
 
     try {
-      test(String.format("create table `%s/1` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/2` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/3` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/4` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
+      run("create table `%s/1` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/2` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/3` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/4` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
 
-      test("refresh table metadata %s", tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select\n" +
-              "count(*) as star_count,\n" +
-              "count(col_int) as int_column_count,\n" +
-              "count(col_vrchr) as vrchr_column_count\n" +
-              "from %s", tableName);
-
-      int expectedNumFiles = 1;
-      String numFilesPattern = "numFiles = " + expectedNumFiles;
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
-      String recordReaderPattern = "DynamicPojoRecordReader";
-
-      testPlanMatchingPatterns(sql, new String[]{numFilesPattern, 
usedMetaSummaryPattern, recordReaderPattern});
+        "count(*) as star_count,\n" +
+        "count(col_int) as int_column_count,\n" +
+        "count(col_vrchr) as vrchr_column_count\n" +
+        "from %s", tableName);
+
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("numFiles = 1")
+        .include("usedMetadataSummaryFile = true")
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-          .sqlQuery(sql)
-          .unOrdered()
-          .baselineColumns("star_count", "int_column_count", 
"vrchr_column_count")
-          .baselineValues(24L, 8L, 12L)
-          .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("star_count", "int_column_count", 
"vrchr_column_count")
+        .baselineValues(24L, 8L, 12L)
+        .go();
 
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
   public void testCountsWithMetadataCacheSummaryAndDirPruning() throws 
Exception {
-    test("use dfs.tmp");
+    run("use dfs.tmp");
     String tableName = "parquet_table_counts";
 
     try {
-      test(String.format("create table `%s/1` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/2` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/3` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/4` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
+      run("create table `%s/1` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/2` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/3` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/4` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
 
-      test("refresh table metadata %s", tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select\n" +
-              "count(*) as star_count,\n" +
-              "count(col_int) as int_column_count,\n" +
-              "count(col_vrchr) as vrchr_column_count\n" +
-              "from %s where dir0 = 1 ", tableName);
-
-      int expectedNumFiles = 1;
-      String numFilesPattern = "numFiles = " + expectedNumFiles;
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
-      String recordReaderPattern = "DynamicPojoRecordReader";
-
-      testPlanMatchingPatterns(sql, new String[]{numFilesPattern, 
usedMetaSummaryPattern, recordReaderPattern});
+        "count(*) as star_count,\n" +
+        "count(col_int) as int_column_count,\n" +
+        "count(col_vrchr) as vrchr_column_count\n" +
+        "from %s where dir0 = 1 ", tableName);
+
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("numFiles = 1")
+        .include("usedMetadataSummaryFile = true")
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-          .sqlQuery(sql)
-          .unOrdered()
-          .baselineColumns("star_count", "int_column_count", 
"vrchr_column_count")
-          .baselineValues(6L, 2L, 3L)
-          .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("star_count", "int_column_count", 
"vrchr_column_count")
+        .baselineValues(6L, 2L, 3L)
+        .go();
 
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
   public void testCountsWithWildCard() throws Exception {
-    test("use dfs.tmp");
+    run("use dfs.tmp");
     String tableName = "parquet_table_counts";
 
     try {
       for (int i = 0; i < 10; i++) {
-        test(String.format("create table `%s/12/%s` as select * from 
cp.`tpch/nation.parquet`", tableName, i));
+        run("create table `%s/12/%s` as select * from 
cp.`tpch/nation.parquet`", tableName, i);
       }
-      test(String.format("create table `%s/2` as select * from 
cp.`tpch/nation.parquet`", tableName));
-      test(String.format("create table `%s/2/11` as select * from 
cp.`tpch/nation.parquet`", tableName));
-      test(String.format("create table `%s/2/12` as select * from 
cp.`tpch/nation.parquet`", tableName));
+      run("create table `%s/2` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("create table `%s/2/11` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("create table `%s/2/12` as select * from cp.`tpch/nation.parquet`", 
tableName);
 
-      test("refresh table metadata %s", tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select\n" +
-              "count(*) as star_count\n" +
-              "from `%s/1*`", tableName);
-
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = false";
-      String recordReaderPattern = "DynamicPojoRecordReader";
+        "count(*) as star_count\n" +
+        "from `%s/1*`", tableName);
 
-      testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern, 
recordReaderPattern});
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("usedMetadataSummaryFile = false")
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-          .sqlQuery(sql)
-          .unOrdered()
-          .baselineColumns("star_count")
-          .baselineValues(250L)
-          .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("star_count")
+        .baselineValues(250L)
+        .go();
 
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
   public void testCountsForLeafDirectories() throws Exception {
-    test("use dfs.tmp");
+    run("use dfs.tmp");
     String tableName = "parquet_table_counts";
 
     try {
-      test("create table `%s/1` as select * from cp.`tpch/nation.parquet`", 
tableName);
-      test("create table `%s/2` as select * from cp.`tpch/nation.parquet`", 
tableName);
-      test("create table `%s/3` as select * from cp.`tpch/nation.parquet`", 
tableName);
-      test("refresh table metadata %s", tableName);
+      run("create table `%s/1` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("create table `%s/2` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("create table `%s/3` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select\n" +
-              "count(*) as star_count\n" +
-              "from `%s/1`", tableName);
-
-      int expectedNumFiles = 1;
-      String numFilesPattern = "numFiles = " + expectedNumFiles;
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
-      String recordReaderPattern = "DynamicPojoRecordReader";
+        "count(*) as star_count\n" +
+        "from `%s/1`", tableName);
 
-      testPlanMatchingPatterns(sql, new String[]{numFilesPattern, 
usedMetaSummaryPattern, recordReaderPattern});
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("numFiles = 1")
+        .include("usedMetadataSummaryFile = true")
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-          .sqlQuery(sql)
-          .unOrdered()
-          .baselineColumns("star_count")
-          .baselineValues(25L)
-          .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("star_count")
+        .baselineValues(25L)
+        .go();
 
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
   public void testCountsForDirWithFilesAndDir() throws Exception {
-    test("use dfs.tmp");
+    run("use dfs.tmp");
     String tableName = "parquet_table_counts";
 
     try {
-      test("create table `%s/1` as select * from cp.`tpch/nation.parquet`", 
tableName);
-      test("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`", 
tableName);
-      test("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`", 
tableName);
-      test("refresh table metadata %s", tableName);
+      run("create table `%s/1` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`", 
tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select count(*) as star_count from `%s/1`", 
tableName);
 
-      int expectedNumFiles = 1;
-      String numFilesPattern = "numFiles = " + expectedNumFiles;
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
-      String recordReaderPattern = "DynamicPojoRecordReader";
-
-      testPlanMatchingPatterns(sql, new String[]{numFilesPattern, 
usedMetaSummaryPattern, recordReaderPattern});
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("numFiles = 1")
+        .include("usedMetadataSummaryFile = true")
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-          .sqlQuery(sql)
-          .unOrdered()
-          .baselineColumns("star_count")
-          .baselineValues(75L)
-          .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("star_count")
+        .baselineValues(75L)
+        .go();
 
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
 
   @Test
-  public void testCountsWithNonExColumn() throws Exception {
-    test("use dfs.tmp");
+  public void testCountsWithNonExistingColumn() throws Exception {
+    run("use dfs.tmp");
     String tableName = "parquet_table_counts_nonex";
 
     try {
-      test(String.format("create table `%s/1` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/2` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/3` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
-      test(String.format("create table `%s/4` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName));
+      run("create table `%s/1` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/2` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/3` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
+      run("create table `%s/4` as select * from 
cp.`parquet/alltypes_optional.parquet`", tableName);
 
-      test("refresh table metadata %s", tableName);
+      run("refresh table metadata %s", tableName);
 
       String sql = String.format("select\n" +
-              "count(*) as star_count,\n" +
-              "count(col_int) as int_column_count,\n" +
-              "count(col_vrchr) as vrchr_column_count,\n" +
-              "count(non_existent) as non_existent\n" +
-              "from %s", tableName);
-
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
-      String recordReaderPattern = "DynamicPojoRecordReader";
-
-      testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern, 
recordReaderPattern});
+        "count(*) as star_count,\n" +
+        "count(col_int) as int_column_count,\n" +
+        "count(col_vrchr) as vrchr_column_count,\n" +
+        "count(non_existent) as non_existent\n" +
+        "from %s", tableName);
+
+      queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("numFiles = 1")
+        .include("usedMetadataSummaryFile = true")
+        .include("DynamicPojoRecordReader")
+        .match();
 
       testBuilder()
-              .sqlQuery(sql)
-              .unOrdered()
-              .baselineColumns("star_count", "int_column_count", 
"vrchr_column_count", "non_existent" )
-              .baselineValues(24L, 8L, 12L, 0L)
-              .go();
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("star_count", "int_column_count", 
"vrchr_column_count", "non_existent" )
+        .baselineValues(24L, 8L, 12L, 0L)
+        .go();
 
     } finally {
-      test("drop table if exists %s", tableName);
+      run("drop table if exists %s", tableName);
     }
   }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 25L, cnt);
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 17b838c..bbf684e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -19,7 +19,6 @@ package org.apache.drill.test;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
@@ -43,6 +42,8 @@ import org.apache.drill.exec.testing.ControlsInjectionUtil;
 import org.apache.drill.test.ClusterFixture.FixtureTestServices;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents a Drill client. Provides many useful test-specific operations 
such
@@ -52,7 +53,8 @@ import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
  */
 
 public class ClientFixture implements AutoCloseable {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientFixture.class);
 
   public static class ClientBuilder {
 
@@ -119,11 +121,9 @@ public class ClientFixture implements AutoCloseable {
   /**
    * Set a runtime option.
    *
-   * @param key
-   * @param value
-   * @throws RpcException
+   * @param key option name
+   * @param value option value
    */
-
   public void alterSession(String key, Object value) {
     String sql = "ALTER SESSION SET `" + key + "` = " + 
ClusterFixture.stringify(value);
     runSqlSilently(sql);
@@ -136,9 +136,8 @@ public class ClientFixture implements AutoCloseable {
 
   /**
    * Reset a system option
-   * @param key
+   * @param key option name
    */
-
   public void resetSession(String key) {
     runSqlSilently("ALTER SESSION RESET `" + key + "`");
   }
@@ -148,19 +147,19 @@ public class ClientFixture implements AutoCloseable {
   }
 
   /**
-   * Run SQL silently (discard results.)
+   * Run SQL silently (discard results).
    *
-   * @param sql
-   * @throws RpcException
+   * @param sql query
+   * @param args format params
+   * @throws IllegalStateException if something goes wrong
    */
-
-  public void runSqlSilently(String sql) {
+  public void runSqlSilently(String sql, Object... args) {
     try {
-      queryBuilder().sql(sql).run();
+      queryBuilder().sql(sql, args).run();
     } catch (Exception e) {
       // Should not fail during tests. Convert exception to unchecked
       // to simplify test code.
-      new IllegalStateException(e);
+      throw new IllegalStateException(e);
     }
   }
 
@@ -182,9 +181,11 @@ public class ClientFixture implements AutoCloseable {
 
   /**
    * Run zero or more queries and output the results in TSV format.
+   *
+   * @param queryString query string
+   * @param print if query result should be printed
    */
-  private void runQueriesAndOutput(final String queryString,
-                                   final boolean print) throws Exception {
+  private void runQueriesAndOutput(final String queryString, final boolean 
print) {
     final String query = QueryTestUtil.normalizeQuery(queryString);
     String[] queries = query.split(";");
     for (String q : queries) {
@@ -203,24 +204,30 @@ public class ClientFixture implements AutoCloseable {
 
   /**
    * Run zero or more queries and log the output in TSV format.
+   *
+   * @param queryString query string
    */
-  public void runQueriesAndLog(final String queryString) throws Exception {
+  public void runQueriesAndLog(final String queryString) {
     runQueriesAndOutput(queryString, false);
   }
 
   /**
    * Run zero or more queries and print the output in TSV format.
+   *
+   * @param queryString query string
    */
-  public void runQueriesAndPrint(final String queryString) throws Exception {
+  public void runQueriesAndPrint(final String queryString) {
     runQueriesAndOutput(queryString, true);
   }
 
   /**
    * Plan a query without execution.
-   * @throws ExecutionException
-   * @throws InterruptedException
+   *
+   * @param type query type
+   * @param query query string
+   * @param isSplitPlan option to tell whether to return single or split plans 
for a query
+   * @return query plan fragments
    */
-
   public QueryPlanFragments planQuery(QueryType type, String query, boolean 
isSplitPlan) {
     DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = 
client.planQuery(type, query, isSplitPlan);
     try {
@@ -251,11 +258,10 @@ public class ClientFixture implements AutoCloseable {
    * Return a parsed query profile for a query summary. Saving of profiles
    * must be turned on.
    *
-   * @param summary
-   * @return
-   * @throws IOException
+   * @param summary query summary
+   * @return profile parser
+   * @throws IOException if unable to parse query profile
    */
-
   public ProfileParser parseProfile(QuerySummary summary) throws IOException {
     return parseProfile(summary.queryIdString());
   }
@@ -264,9 +270,11 @@ public class ClientFixture implements AutoCloseable {
    * Parse a query profile from the local storage location given the
    * query ID. Saving of profiles must be turned on. This is a bit of
    * a hack: the profile should be available directly from the server.
-   * @throws IOException
+   *
+   * @param queryId query ID
+   * @return profile parser
+   * @throws IOException if unable to parse the profile
    */
-
   public ProfileParser parseProfile(String queryId) throws IOException {
     File file = new File(cluster.getProfileDir(), queryId + ".sys.drill");
     return new ProfileParser(file);
@@ -281,7 +289,6 @@ public class ClientFixture implements AutoCloseable {
    * @param controls the controls string created by
    * {@link org.apache.drill.exec.testing.Controls#newBuilder()} builder.
    */
-
   public void setControls(String controls) {
     ControlsInjectionUtil.validateControlsString(controls);
     alterSession(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
@@ -309,10 +316,8 @@ public class ClientFixture implements AutoCloseable {
    * <li><tt>ALTER SESSION SET `foo` = ";"</tt></li>
    * <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li>
    */
-
   public static class StatementParser {
     private final Reader in;
-    private StringBuilder buf;
 
     public StatementParser(Reader in) {
       this.in = in;
@@ -320,7 +325,7 @@ public class ClientFixture implements AutoCloseable {
 
     public String parseNext() throws IOException {
       boolean eof = false;
-      buf = new StringBuilder();
+      StringBuilder buf = new StringBuilder();
       for (;;) {
         int c = in.read();
         if (c == -1) {
@@ -377,11 +382,12 @@ public class ClientFixture implements AutoCloseable {
 
   /**
    * Execute a set of statements from a file.
+   *
    * @param source the set of statements, separated by semicolons
    * @return the number of statements executed
+   * @throws IOException if anable to execute statements from file
    */
-
-  public int exec(File source) throws FileNotFoundException, IOException {
+  public int exec(File source) throws IOException {
     try (Reader in = new BufferedReader(new FileReader(source))) {
       return exec(in);
     }
@@ -389,10 +395,10 @@ public class ClientFixture implements AutoCloseable {
 
   /**
    * Execute a set of statements from a string.
+   *
    * @param stmts the set of statements, separated by semicolons
    * @return the number of statements executed
    */
-
   public int exec(String stmts) {
     try (Reader in = new StringReader(stmts)) {
       return exec(in);
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 94f5d85..6895d93 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -19,17 +19,17 @@ package org.apache.drill.test;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -61,6 +61,8 @@ import 
org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
 import org.apache.drill.test.ClientFixture.StatementParser;
 import org.joda.time.Period;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Builder for a Drill query. Provides all types of query formats,
  * and a variety of ways to run the query.
@@ -73,7 +75,7 @@ public class QueryBuilder {
    * using a {@link QuerySummaryFuture}.
    */
 
-  public class SummaryOnlyQueryEventListener implements UserResultsListener {
+  public static class SummaryOnlyQueryEventListener implements 
UserResultsListener {
 
     /**
      * The future to be notified. Created here and returned by the
@@ -123,7 +125,7 @@ public class QueryBuilder {
    * just the summary of the query.
    */
 
-  public class QuerySummaryFuture implements Future<QuerySummary> {
+  public static class QuerySummaryFuture implements Future<QuerySummary> {
 
     /**
      * Synchronizes the listener thread and the test thread that
@@ -153,7 +155,7 @@ public class QueryBuilder {
     public boolean isDone() { return summary != null; }
 
     @Override
-    public QuerySummary get() throws InterruptedException, ExecutionException {
+    public QuerySummary get() throws InterruptedException {
       lock.await();
       return summary;
     }
@@ -163,8 +165,7 @@ public class QueryBuilder {
      */
 
     @Override
-    public QuerySummary get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
+    public QuerySummary get(long timeout, TimeUnit unit) throws 
InterruptedException {
       return get();
     }
 
@@ -177,7 +178,6 @@ public class QueryBuilder {
   /**
    * Summary results of a query: records, batches, run time.
    */
-
   public static class QuerySummary {
     private final QueryId queryId;
     private final int records;
@@ -261,7 +261,6 @@ public class QueryBuilder {
    * @param planFragments fragments that make up the plan
    * @return this builder
    */
-
   public QueryBuilder plan(List<PlanFragment> planFragments) {
     queryType = QueryType.EXECUTION;
     this.planFragments = planFragments;
@@ -275,8 +274,7 @@ public class QueryBuilder {
    * optional ending semi-colon
    * @return this builder
    */
-
-  public QueryBuilder sql(File file) throws FileNotFoundException, IOException 
{
+  public QueryBuilder sql(File file) throws IOException {
     try (BufferedReader in = new BufferedReader(new FileReader(file))) {
       StatementParser parser = new StatementParser(in);
       String sql = parser.parseNext();
@@ -297,7 +295,6 @@ public class QueryBuilder {
    * @param resource Name of the resource
    * @return this builder
    */
-
   public QueryBuilder sqlResource(String resource) {
     sql(ClusterFixture.loadResource(resource));
     return this;
@@ -321,7 +318,6 @@ public class QueryBuilder {
    * @return the query summary
    * @throws Exception if anything goes wrong anywhere in the execution
    */
-
   public QuerySummary run() throws Exception {
     return produceSummary(withEventListener());
   }
@@ -330,9 +326,8 @@ public class QueryBuilder {
    * Run the query and return a list of the result batches. Use
    * if the batch count is small and you want to work with them.
    * @return a list of batches resulting from the query
-   * @throws RpcException
+   * @throws RpcException if anything goes wrong
    */
-
   public List<QueryDataBatch> results() throws RpcException {
     Preconditions.checkNotNull(queryType, "Query not provided.");
     Preconditions.checkNotNull(queryText, "Query not provided.");
@@ -345,13 +340,12 @@ public class QueryBuilder {
    * by the code using a {@link RowSetReader}.
    * <p>
    *
-   * @see {@link #rowSetIterator()} for a version that reads a series of
+   * @see #rowSetIterator() for a version that reads a series of
    * batches as row sets.
    * @return a row set that represents the first non-empty batch returned from
    * the query
    * @throws RpcException if anything goes wrong
    */
-
   public DirectRowSet rowSet() throws RpcException {
 
     // Ignore all but the first non-empty batch.
@@ -435,7 +429,6 @@ public class QueryBuilder {
    * @param <V> vector class
    * @param <T> return type
    * @return result produced by {@code reader} lambda or {@code null} if no 
records returned from the query
-   *
    */
   @SuppressWarnings("unchecked")
   public <T, V> T vectorValue(String columnName, Class<V> vectorClass, 
VectorQueryReader<T, V> reader)
@@ -543,7 +536,6 @@ public class QueryBuilder {
    * @return the value of the first column of the first row
    * @throws RpcException if anything goes wrong
    */
-
   public String singletonString() throws RpcException {
     return singletonGeneric(ScalarReader::getString);
   }
@@ -554,7 +546,6 @@ public class QueryBuilder {
    *
    * @param listener the Drill listener
    */
-
   public void withListener(UserResultsListener listener) {
     Preconditions.checkNotNull(queryType, "Query not provided.");
     if (planFragments != null) {
@@ -578,7 +569,6 @@ public class QueryBuilder {
    *
    * @return the query event listener
    */
-
   public BufferingQueryEventListener withEventListener() {
     BufferingQueryEventListener listener = new BufferingQueryEventListener();
     withListener(listener);
@@ -598,15 +588,12 @@ public class QueryBuilder {
   }
 
   /**
-   * <p>
-   *   Run a query and logs the output in TSV format.
-   *   Similar to {@link QueryTestUtil#testRunAndLog} with one query.
-   * </p>
+   * Run a query and logs the output in TSV format.
+   * Similar to {@link QueryTestUtil#testRunAndLog} with one query.
    *
    * @return The number of rows returned.
-   * @throws Exception If anything goes wrong with query execution.
    */
-  public long log() throws Exception {
+  public long log() {
     return log(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
   }
 
@@ -623,15 +610,12 @@ public class QueryBuilder {
   }
 
   /**
-   * <p>
-   *   Runs a query and prints the output to stdout in TSV format.
-   *   Similar to {@link QueryTestUtil#testRunAndLog} with one query.
-   * </p>
+   * Runs a query and prints the output to stdout in TSV format.
+   * Similar to {@link QueryTestUtil#testRunAndLog} with one query.
    *
    * @return The number of rows returned.
-   * @throws Exception If anything goes wrong with query execution.
    */
-  public long print() throws Exception {
+  public long print() {
     return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
   }
 
@@ -639,8 +623,9 @@ public class QueryBuilder {
    * Run the query asynchronously, returning a future to be used
    * to check for query completion, wait for completion, and obtain
    * the result summary.
+   *
+   * @return query summary future
    */
-
   public QuerySummaryFuture futureSummary() {
     QuerySummaryFuture future = new QuerySummaryFuture();
     withListener(new SummaryOnlyQueryEventListener(future));
@@ -663,9 +648,10 @@ public class QueryBuilder {
   /**
    * Submit an "EXPLAIN" statement, and return text form of the
    * plan.
+   *
+   * @return explain plan
    * @throws Exception if the query fails
    */
-
   public String explainText() throws Exception {
     return explain(ClusterFixture.EXPLAIN_PLAN_TEXT);
   }
@@ -673,9 +659,10 @@ public class QueryBuilder {
   /**
    * Submit an "EXPLAIN" statement, and return the JSON form of the
    * plan.
+   *
+   * @return explain plan
    * @throws Exception if the query fails
    */
-
   public String explainJson() throws Exception {
     return explain(ClusterFixture.EXPLAIN_PLAN_JSON);
   }
@@ -685,12 +672,24 @@ public class QueryBuilder {
     return queryPlan(format);
   }
 
+  /**
+   * Submits explain plan statement
+   * and creates plan matcher instance based on return query plan.
+   *
+   * @return plan matcher
+   * @throws Exception if the query fails
+   */
+  public PlanMatcher planMatcher() throws Exception {
+    String plan = explainText();
+    return new PlanMatcher(plan);
+  }
+
   private QuerySummary produceSummary(BufferingQueryEventListener listener) 
throws Exception {
     long start = System.currentTimeMillis();
     int recordCount = 0;
     int batchCount = 0;
     QueryId queryId = null;
-    QueryState state = null;
+    QueryState state;
     loop:
     for (;;) {
       QueryEvent event = listener.get();
@@ -727,11 +726,13 @@ public class QueryBuilder {
    * Submit an "EXPLAIN" statement, and return the column value which
    * contains the plan's string.
    * <p>
-   * Cribbed from {@link PlanTestBase#getPlanInString(String, String)}
-   * @throws Exception if anything goes wrogn in the query
+   * Cribbed from PlanTestBase#getPlanInString(String, String)
+   *
+   * @param columnName column name to extract from result
+   * @return query plan
+   * @throws Exception if anything goes wrong in the query
    */
-
-  protected String queryPlan(String columnName) throws Exception {
+  private String queryPlan(String columnName) throws Exception {
     Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain 
an SQL query.");
     final List<QueryDataBatch> results = results();
     final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
@@ -764,4 +765,69 @@ public class QueryBuilder {
 
     return builder.toString();
   }
+
+  /**
+   * Collects expected and non-expected query patterns.
+   * Upon {@link #match()} method call, matches given patterns to the query 
plan.
+   */
+  public static class PlanMatcher {
+
+    private static final String EXPECTED_NOT_FOUND = "Did not find expected 
pattern";
+    private static final String UNEXPECTED_FOUND = "Found unwanted pattern";
+
+    private final String plan;
+    private final List<String> included = new ArrayList<>();
+    private final List<String> excluded = new ArrayList<>();
+
+    public PlanMatcher(String plan) {
+      this.plan = plan;
+    }
+
+    public PlanMatcher include(String... patterns) {
+      included.addAll(Arrays.asList(patterns));
+      return this;
+    }
+
+    public PlanMatcher exclude(String... patterns) {
+      excluded.addAll(Arrays.asList(patterns));
+      return this;
+    }
+
+    /**
+     * Checks if stored patterns (string parts) are included or excluded in 
the given plan.
+     *
+     * <p/>
+     * Example: <br/>
+     * For the plan:
+     * <pre>
+     * 00-00    Screen
+     * 00-01      Project(cnt=[$0])
+     * 00-02        DirectScan(groupscan=[selectionRoot = 
classpath:/tpch/nation.parquet,
+     * numFiles = 1, usedMetadataSummaryFile = false, 
DynamicPojoRecordReader{records = [[25]]}])
+     * </pre>
+     *
+     * <ul>
+     *  <li>To check that number of files are 1 and DynamicPojoRecordReader is 
used:
+     *  <code>planMatcher.include("numFiles = 1", 
"DynamicPojoRecordReader")</code></li>
+     *  <li>To check that metadata summary file was not used:
+     *  <code>planMatcher.exclude("usedMetadataSummaryFile = true")</code></li>
+     * </ul>
+     *
+     *  Calling <code>planMatcher.match()</code> method would check that given 
patterns are present
+     *  or absent in the given plan. Method execution will fail with {@link 
AssertionError}
+     *  only if expected pattern was not matched or unexpected pattern was 
matched.
+     */
+    public void match() {
+      included.forEach(pattern -> match(pattern, true));
+      excluded.forEach(pattern -> match(pattern, false));
+    }
+
+    private void match(String patternString, boolean expectedResult) {
+      Pattern pattern = Pattern.compile(patternString);
+      Matcher matcher = pattern.matcher(plan);
+      String message = String.format("%s in plan: %s\n%s",
+        expectedResult ? EXPECTED_NOT_FOUND : UNEXPECTED_FOUND, patternString, 
plan);
+      assertEquals(message, expectedResult, matcher.find());
+    }
+  }
 }

Reply via email to