http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index f36c193..f0653f7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
+import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -31,11 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.DrillTestWrapper.TestServices;
-import org.apache.drill.QueryTestUtil;
-import org.apache.drill.TestBuilder;
+import org.apache.drill.test.DrillTestWrapper.TestServices;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -57,22 +54,24 @@ import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
 import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
 import 
org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
-import org.apache.drill.exec.util.TestUtilities;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Resources;
 
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DEFAULT_SCHEMA;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA;
+
 /**
  * Test fixture to start a Drillbit with provide options, create a client, and
  * execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides
  * a builder to set the necessary embedded Drillbit and client options, then
  * creates the requested Drillbit and client.
  */
-
 public class ClusterFixture extends BaseFixture implements AutoCloseable {
-  // private static final org.slf4j.Logger logger =
-  // org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
   public static final String ENABLE_FULL_CACHE = 
"drill.exec.test.use-full-cache";
   public static final int MAX_WIDTH_PER_NODE = 2;
 
@@ -87,10 +86,10 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
       // The CTTAS function requires that the default temporary workspace be
       // writable. By default, the default temporary workspace points to
       // dfs.tmp. But, the test setup marks dfs.tmp as read-only. To work
-      // around this, tests are supposed to use dfs_test. So, we need to
-      // set the default temporary workspace to dfs_test.tmp.
+      // around this, tests are supposed to use dfs. So, we need to
+      // set the default temporary workspace to dfs.tmp.
 
-      put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, 
BaseTestQuery.TEMP_SCHEMA);
+      put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, DFS_TMP_SCHEMA);
       put(ExecConstants.HTTP_ENABLE, false);
       put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, true);
       put("drill.catastrophic_to_standard_out", true);
@@ -121,7 +120,6 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
       // mode.
 
       
put(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT, 
"/tmp/drill/tests");
-      put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, 
"file:/tmp/drill/tests");
     }
   };
 
@@ -132,30 +130,22 @@ public class ClusterFixture extends BaseFixture 
implements AutoCloseable {
   private boolean ownsZK;
   private ZookeeperHelper zkHelper;
   private RemoteServiceSet serviceSet;
-  private File dfsTestTempDir;
   protected List<ClientFixture> clients = new ArrayList<>();
   protected RestClientFixture restClientFixture;
   private boolean usesZk;
-  private boolean preserveLocalFiles;
-  private boolean isLocal;
   private Properties clientProps;
-
-  /**
-   * Temporary directories created for this test cluster.
-   * Each is removed when closing the cluster.
-   */
-
-  private List<File> tempDirs = new ArrayList<>();
+  private final ClusterFixtureBuilder builder;
 
   ClusterFixture(ClusterFixtureBuilder builder) {
+    this.builder = Preconditions.checkNotNull(builder);
 
-    setClientProps(builder);
-    configureZk(builder);
+    setClientProps();
+    configureZk();
     try {
-      createConfig(builder);
+      createConfig();
       allocator = RootAllocatorFactory.newRoot(config);
-      startDrillbits(builder);
-      applyOptions(builder);
+      startDrillbits();
+      applyOptions();
     } catch (Exception e) {
       // Translate exceptions to unchecked to avoid cluttering
       // tests. Failures will simply fail the test itself.
@@ -166,9 +156,8 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
 
   /**
    * Set the client properties to be used by client fixture.
-   * @param builder {@link ClusterFixtureBuilder#clientProps}
    */
-  private void setClientProps(ClusterFixtureBuilder builder) {
+  private void setClientProps() {
       clientProps = builder.clientProps;
   }
 
@@ -176,8 +165,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
     return clientProps;
   }
 
-  private void configureZk(ClusterFixtureBuilder builder) {
-
+  private void configureZk() {
     // Start ZK if requested.
 
     String zkConnect = null;
@@ -213,8 +201,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
     }
   }
 
-  private void createConfig(ClusterFixtureBuilder builder) throws Exception {
-
+  private void createConfig() throws Exception {
     // Create a config
     // Because of the way DrillConfig works, we can set the ZK
     // connection string only if a property set is provided.
@@ -226,44 +213,14 @@ public class ClusterFixture extends BaseFixture 
implements AutoCloseable {
 
       serviceSet = null;
       usesZk = true;
-      isLocal = false;
     } else {
       // Embedded Drillbit.
 
       serviceSet = RemoteServiceSet.getLocalServiceSet();
-      isLocal = true;
     }
   }
 
-  private void startDrillbits(ClusterFixtureBuilder builder) throws Exception {
-//    // Ensure that Drill uses the log directory determined here rather than
-//    // it's hard-coded defaults. WIP: seems to be needed some times but
-//    // not others.
-//
-//    String logDir = null;
-//    if (builder.tempDir != null) {
-//      logDir = builder.tempDir.getAbsolutePath();
-//    }
-//    if (logDir == null) {
-//      logDir = config.getString(ExecConstants.DRILL_TMP_DIR);
-//      if (logDir != null) {
-//        logDir += "/drill/log";
-//      }
-//    }
-//    if (logDir == null) {
-//      logDir = "/tmp/drill";
-//    }
-//    new File(logDir).mkdirs();
-//    System.setProperty("drill.log-dir", logDir);
-
-    dfsTestTempDir = makeTempDir("dfs-test");
-
-    // Clean up any files that may have been left from the
-    // last run.
-
-    preserveLocalFiles = builder.preserveLocalFiles;
-    removeLocalFiles();
-
+  private void startDrillbits() throws Exception {
     // Start the Drillbits.
 
     Preconditions.checkArgument(builder.bitCount > 0);
@@ -302,12 +259,16 @@ public class ClusterFixture extends BaseFixture 
implements AutoCloseable {
   }
 
   private void configureStoragePlugins(Drillbit bit) throws Exception {
-    // Create the dfs_test name space
+    // Create the dfs name space
+    builder.dirTestWatcher.newDfsTestTmpDir();
 
     @SuppressWarnings("resource")
     final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage();
-    TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, 
dfsTestTempDir.getAbsolutePath());
-    TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+    StoragePluginTestUtils.configureFormatPlugins(pluginRegistry);
+
+    
StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME,
 pluginRegistry, builder.dirTestWatcher.getDfsTestTmpDir(), TMP_SCHEMA);
+    
StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME,
 pluginRegistry, builder.dirTestWatcher.getRootDir(), ROOT_SCHEMA);
+    
StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME,
 pluginRegistry, builder.dirTestWatcher.getRootDir(), DEFAULT_SCHEMA);
 
     // Create the mock data plugin
 
@@ -319,8 +280,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
     ((StoragePluginRegistryImpl) 
pluginRegistry).definePlugin(MockStorageEngineConfig.NAME, config, plugin);
   }
 
-  private void applyOptions(ClusterFixtureBuilder builder) throws Exception {
-
+  private void applyOptions() throws Exception {
     // Apply system options
 
     if (builder.systemOptions != null) {
@@ -342,7 +302,6 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
   public Drillbit drillbit(String name) { return bits.get(name); }
   public Collection<Drillbit> drillbits() { return bits.values(); }
   public RemoteServiceSet serviceSet() { return serviceSet; }
-  public File getDfsTestTmpDir() { return dfsTestTempDir; }
 
   public ClientFixture.ClientBuilder clientBuilder() {
     return new ClientFixture.ClientBuilder(this);
@@ -442,74 +401,6 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
       }
     }
     zkHelper = null;
-
-    // Delete any local files, if we wrote to the local
-    // persistent store. But, leave the files if the user wants
-    // to review them, for debugging, say. Note that, even if the
-    // files are preserved here, they will be removed when the
-    // next cluster fixture starts, else the CTTAS initialization
-    // will fail.
-
-    if (! preserveLocalFiles) {
-      try {
-        removeLocalFiles();
-      } catch (Exception e) {
-        ex = ex == null ? e : ex;
-      }
-    }
-
-    // Remove temporary directories created for this cluster session.
-
-    try {
-      removeTempDirs();
-    } catch (Exception e) {
-      ex = ex == null ? e : ex;
-    }
-    if (ex != null) {
-      throw ex;
-    }
-  }
-
-  /**
-   * Removes files stored locally in the "local store provider."
-   * Required because CTTAS setup fails if these files are left from one
-   * run to the next.
-   *
-   * @throws IOException if a directory cannot be deleted
-   */
-
-  private void removeLocalFiles() throws IOException {
-
-    // Don't delete if this is not a local Drillbit.
-
-    if (! isLocal) {
-      return;
-    }
-
-    // Remove the local files if they exist.
-
-    String localStoreLocation = 
config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH);
-    removeDir(new File(localStoreLocation));
-  }
-
-  private void removeTempDirs() throws IOException {
-    IOException ex = null;
-    for (File dir : tempDirs) {
-      try {
-        removeDir(dir);
-      } catch (IOException e) {
-        ex = ex == null ? e : ex;
-      }
-    }
-    if (ex != null) {
-      throw ex;
-    }
-  }
-
-  public void removeDir(File dir) throws IOException {
-    if (dir.exists()) {
-      FileUtils.deleteDirectory(dir);
-    }
   }
 
   /**
@@ -537,7 +428,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
    * Define a workspace within an existing storage plugin. Useful for
    * pointing to local file system files outside the Drill source tree.
    *
-   * @param pluginName name of the plugin like "dfs" or "dfs_test".
+   * @param pluginName name of the plugin like "dfs".
    * @param schemaName name of the new schema
    * @param path directory location (usually local)
    * @param defaultFormat default format for files in the schema
@@ -584,11 +475,14 @@ public class ClusterFixture extends BaseFixture 
implements AutoCloseable {
   public static final String EXPLAIN_PLAN_TEXT = "text";
   public static final String EXPLAIN_PLAN_JSON = "json";
 
-  public static ClusterFixtureBuilder builder() {
-    ClusterFixtureBuilder builder = new ClusterFixtureBuilder()
+  public static ClusterFixtureBuilder builder(BaseDirTestWatcher 
dirTestWatcher) {
+      ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
          .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, 
MAX_WIDTH_PER_NODE);
     Properties props = new Properties();
     props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
+    props.setProperty(ExecConstants.DRILL_TMP_DIR, 
dirTestWatcher.getTmpDir().getAbsolutePath());
+    props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, 
dirTestWatcher.getStoreDir().getAbsolutePath());
+
     builder.configBuilder.configProps(props);
     return builder;
   }
@@ -603,9 +497,8 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
    *
    * @return a fixture builder with no default properties set
    */
-
-  public static ClusterFixtureBuilder bareBuilder() {
-    return new ClusterFixtureBuilder();
+  public static ClusterFixtureBuilder bareBuilder(BaseDirTestWatcher 
dirTestWatcher) {
+    return new ClusterFixtureBuilder(dirTestWatcher);
   }
 
   /**
@@ -645,8 +538,8 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
    * @return a cluster fixture with standard options
    * @throws Exception if something goes wrong
    */
-  public static ClusterFixture standardCluster() {
-    return builder().build();
+  public static ClusterFixture standardCluster(BaseDirTestWatcher 
dirTestWatcher) {
+    return builder(dirTestWatcher).build();
   }
 
   /**
@@ -660,7 +553,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
 
   public static String stringify(Object value) {
     if (value instanceof String) {
-      return "'" + (String) value + "'";
+      return "'" + value + "'";
     } else {
       return value.toString();
     }
@@ -714,34 +607,20 @@ public class ClusterFixture extends BaseFixture 
implements AutoCloseable {
   }
 
   /**
-   * Create a temporary directory which will be removed when the
-   * cluster closes.
-   *
-   * @param dirName the name of the leaf directory
-   * @return the path to the temporary directory which is usually
-   * under the temporary directory structure for this machine
-   */
-
-  public File makeTempDir(final String dirName) {
-    File dir = getTempDir(dirName);
-    tempDirs.add(dir);
-    return dir;
-  }
-
-  /**
    * Create a temporary data directory which will be removed when the
    * cluster closes, and register it as a "dfs" name space.
    *
-   * @param key the name to use for the directory and the name space.
+   * @param key The name to use for the directory and the name space.
    * Access the directory as "dfs.<key>".
-   * @param defaultFormat default storage format for the workspace
+   * @param defaultFormat The default storage format for the workspace.
+   * @param formatPluginConfig The format plugin config.
    * @return location of the directory which can be used to create
    * temporary input files
    */
 
-  public File makeDataDir(String key, String defaultFormat) {
-    File dir = makeTempDir(key);
-    defineWorkspace("dfs", key, dir.getAbsolutePath(), defaultFormat);
+  public File makeDataDir(String key, String defaultFormat, FormatPluginConfig 
formatPluginConfig) {
+    File dir = builder.dirTestWatcher.makeSubDir(Paths.get(key));
+    defineWorkspace("dfs", key, dir.getAbsolutePath(), defaultFormat, 
formatPluginConfig);
     return dir;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 8295554..82bcf75 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -17,18 +17,18 @@
  
******************************************************************************/
 package org.apache.drill.test;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.server.options.OptionDefinition;
 
 /**
  * Build a Drillbit and client with the options provided. The simplest
- * builder starts an embedded Drillbit, with the "dfs_test" name space,
+ * builder starts an embedded Drillbit, with the "dfs" name space,
  * a max width (parallelization) of 2.
  */
 
@@ -48,9 +48,6 @@ public class ClusterFixtureBuilder {
   // in the defaults.
 
   public static final int DEFAULT_ZK_REFRESH = 500; // ms
-  public static final int DEFAULT_SERVER_RPC_THREADS = 10;
-  public static final int DEFAULT_SCAN_THREADS = 8;
-  public static final String OPTION_DEFAULTS_ROOT = "drill.exec.options.";
 
   protected ConfigBuilder configBuilder = new ConfigBuilder();
   protected List<RuntimeOption> sessionOptions;
@@ -60,16 +57,18 @@ public class ClusterFixtureBuilder {
   protected int localZkCount;
   protected ZookeeperHelper zkHelper;
   protected boolean usingZk;
-  protected File tempDir;
-  protected boolean preserveLocalFiles;
   protected Properties clientProps;
+  protected final BaseDirTestWatcher dirTestWatcher;
+
+  public ClusterFixtureBuilder(BaseDirTestWatcher dirTestWatcher) {
+    this.dirTestWatcher = Preconditions.checkNotNull(dirTestWatcher);
+  }
 
   /**
    * The configuration builder which this fixture builder uses.
    * @return the configuration builder for use in setting "advanced"
    * configuration options.
    */
-
   public ConfigBuilder configBuilder() { return configBuilder; }
 
   /**
@@ -87,7 +86,6 @@ public class ClusterFixtureBuilder {
    * @return this builder
    * @see {@link #configProperty(String, Object)}
    */
-
   public ClusterFixtureBuilder configResource(String configResource) {
 
     // TypeSafe gets unhappy about a leading slash, but other functions
@@ -98,21 +96,18 @@ public class ClusterFixtureBuilder {
     return this;
   }
 
-  /**
-   *
-   */
    public ClusterFixtureBuilder setOptionDefault(String key, Object value) {
-     String option_name = OPTION_DEFAULTS_ROOT + key;
+     String option_name = ExecConstants.OPTION_DEFAULTS_ROOT + key;
      configBuilder().put(option_name, value.toString());
      return this;
    }
+
   /**
    * Add an additional boot-time property for the embedded Drillbit.
    * @param key config property name
    * @param value property value
    * @return this builder
    */
-
   public ClusterFixtureBuilder configProperty(String key, Object value) {
     configBuilder.put(key, value.toString());
     return this;
@@ -145,9 +140,7 @@ public class ClusterFixtureBuilder {
    * @param key the name of the session option
    * @param value the value of the session option
    * @return this builder
-   * @see {@link ClusterFixture#alterSession(String, Object)}
    */
-
   public ClusterFixtureBuilder sessionOption(String key, Object value) {
     if (sessionOptions == null) {
       sessionOptions = new ArrayList<>();
@@ -163,9 +156,7 @@ public class ClusterFixtureBuilder {
    * @param key the name of the system option
    * @param value the value of the system option
    * @return this builder
-   * @see {@link ClusterFixture#alterSystem(String, Object)}
    */
-
   public ClusterFixtureBuilder systemOption(String key, Object value) {
     if (systemOptions == null) {
       systemOptions = new ArrayList<>();
@@ -252,44 +243,13 @@ public class ClusterFixtureBuilder {
     return this;
   }
 
-  public ClusterFixtureBuilder tempDir(File path) {
-    this.tempDir = path;
-    return this;
-  }
-
-  /**
-   * Starting with the addition of the CTTAS feature, a Drillbit will
-   * not restart unless we delete all local storage files before
-   * starting the Drillbit again. In particular, the stored copies
-   * of the storage plugin configs cause the temporary workspace
-   * check to fail. Normally the cluster fixture cleans up files
-   * both before starting and after shutting down the cluster. Set this
-   * option to preserve files after shutdown, perhaps to debug the
-   * contents.
-   * <p>
-   * This clean-up is needed only if we enable local storage writes
-   * (which we must do, unfortunately, to capture and analyze
-   * storage profiles.)
-   *
-   * @return this builder
-   */
-
-  public ClusterFixtureBuilder keepLocalFiles() {
-    preserveLocalFiles = true;
-    return this;
-  }
-
   /**
    * Enable saving of query profiles. The only way to save them is
    * to enable local store provider writes, which also saves the
-   * storage plugin configs. Doing so causes the CTTAS feature to
-   * fail on the next run, so the test fixture deletes all local
-   * files on start and close, unless
-   * {@link #keepLocalFiles()} is set.
+   * storage plugin configs.
    *
    * @return this builder
    */
-
   public ClusterFixtureBuilder saveProfiles() {
     configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
     systemOption(ExecConstants.ENABLE_QUERY_PROFILE_OPTION, true);
@@ -319,7 +279,6 @@ public class ClusterFixtureBuilder {
    *
    * @return
    */
-
   public ClusterFixture build() {
     return new ClusterFixture(this);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
index 4a3823c..c85c591 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -19,9 +19,7 @@ package org.apache.drill.test;
 
 import java.io.IOException;
 
-import org.apache.drill.TestBuilder;
 import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.test.DrillTest;
 import org.junit.AfterClass;
 
 /**

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
new file mode 100644
index 0000000..99bbacc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.HyperVectorValueIterator;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Assert;
+
+/**
+ * An object to encapsulate the options for a Drill unit test, as well as the 
execution methods to perform the tests and
+ * validation of results.
+ *
+ * To construct an instance easily, look at the TestBuilder class. From an 
implementation of
+ * the BaseTestQuery class, and instance of the builder is accessible through 
the testBuilder() method.
+ */
+public class DrillTestWrapper {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
+
+  public interface TestServices {
+    BufferAllocator allocator();
+
+    void test(String query) throws Exception;
+
+    List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws 
Exception;
+  }
+
+  // TODO - when in JSON, read baseline in all text mode to avoid precision 
loss for decimal values
+
+  // This flag will enable all of the values that are validated to be logged. 
For large validations this is time consuming
+  // so this is not exposed in a way that it can be enabled for an individual 
test. It can be changed here while debugging
+  // a test to see all of the output, but as this framework is doing full 
validation, there is no reason to keep it on as
+  // it will only make the test slower.
+  private static boolean VERBOSE_DEBUG = false;
+
+  // Unit test doesn't expect any specific batch count
+  public static final int EXPECTED_BATCH_COUNT_NOT_SET = -1;
+
+  // The motivation behind the TestBuilder was to provide a clean API for test 
writers. The model is mostly designed to
+  // prepare all of the components necessary for running the tests, before the 
TestWrapper is initialized. There is however
+  // one case where the setup for the baseline is driven by the test query 
results, and this is implicit type enforcement
+  // for the baseline data. In this case there needs to be a call back into 
the TestBuilder once we know the type information
+  // from the test query.
+  private TestBuilder testBuilder;
+  /**
+   * Test query to run. Type of object depends on the {@link #queryType}
+   */
+  private Object query;
+  // The type of query provided
+  private UserBitShared.QueryType queryType;
+  // The type of query provided for the baseline
+  private UserBitShared.QueryType baselineQueryType;
+  // should ordering be enforced in the baseline check
+  private boolean ordered;
+  private TestServices services;
+  // queries to run before the baseline or test queries, can be used to set 
options
+  private String baselineOptionSettingQueries;
+  private String testOptionSettingQueries;
+  // two different methods are available for comparing ordered results, the 
default reads all of the records
+  // into giant lists of objects, like one giant on-heap batch of 'vectors'
+  // this flag enables the other approach which iterates through a hyper batch 
for the test query results and baseline
+  // while this does work faster and use less memory, it can be harder to 
debug as all of the elements are not in a
+  // single list
+  private boolean highPerformanceComparison;
+  // if the baseline is a single option test writers can provide the baseline 
values and columns
+  // without creating a file, these are provided to the builder in the 
baselineValues() and baselineColumns() methods
+  // and translated into a map in the builder
+  private String[] baselineColumns;
+  private List<Map<String, Object>> baselineRecords;
+
+  private int expectedNumBatches;
+
+  public DrillTestWrapper(TestBuilder testBuilder, TestServices services, 
Object query, QueryType queryType,
+      String baselineOptionSettingQueries, String testOptionSettingQueries,
+      QueryType baselineQueryType, boolean ordered, boolean 
highPerformanceComparison,
+      String[] baselineColumns, List<Map<String, Object>> baselineRecords, int 
expectedNumBatches) {
+    this.testBuilder = testBuilder;
+    this.services = services;
+    this.query = query;
+    this.queryType = queryType;
+    this.baselineQueryType = baselineQueryType;
+    this.ordered = ordered;
+    this.baselineOptionSettingQueries = baselineOptionSettingQueries;
+    this.testOptionSettingQueries = testOptionSettingQueries;
+    this.highPerformanceComparison = highPerformanceComparison;
+    this.baselineColumns = baselineColumns;
+    this.baselineRecords = baselineRecords;
+    this.expectedNumBatches = expectedNumBatches;
+  }
+
+  public void run() throws Exception {
+    if (testBuilder.getExpectedSchema() != null) {
+      compareSchemaOnly();
+    } else {
+      if (ordered) {
+        compareOrderedResults();
+      } else {
+        compareUnorderedResults();
+      }
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    return services.allocator();
+  }
+
+  private void compareHyperVectors(Map<String, HyperVectorValueIterator> 
expectedRecords,
+      Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
+    for (String s : expectedRecords.keySet()) {
+      assertNotNull("Expected column '" + s + "' not found.", 
actualRecords.get(s));
+      assertEquals(expectedRecords.get(s).getTotalRecords(), 
actualRecords.get(s).getTotalRecords());
+      HyperVectorValueIterator expectedValues = expectedRecords.get(s);
+      HyperVectorValueIterator actualValues = actualRecords.get(s);
+      int i = 0;
+      while (expectedValues.hasNext()) {
+        compareValuesErrorOnMismatch(expectedValues.next(), 
actualValues.next(), i, s);
+        i++;
+      }
+    }
+    cleanupHyperValueIterators(expectedRecords.values());
+    cleanupHyperValueIterators(actualRecords.values());
+  }
+
+  private void cleanupHyperValueIterators(Collection<HyperVectorValueIterator> 
hyperBatches) {
+    for (HyperVectorValueIterator hvi : hyperBatches) {
+      for (ValueVector vv : hvi.getHyperVector().getValueVectors()) {
+        vv.clear();
+      }
+    }
+  }
+
+  public static void compareMergedVectors(Map<String, List<Object>> 
expectedRecords, Map<String, List<Object>> actualRecords) throws Exception {
+    for (String s : actualRecords.keySet()) {
+      assertNotNull("Unexpected extra column " + s + " returned by query.", 
expectedRecords.get(s));
+      assertEquals("Incorrect number of rows returned by query.", 
expectedRecords.get(s).size(), actualRecords.get(s).size());
+      List<?> expectedValues = expectedRecords.get(s);
+      List<?> actualValues = actualRecords.get(s);
+      assertEquals("Different number of records returned", 
expectedValues.size(), actualValues.size());
+
+      for (int i = 0; i < expectedValues.size(); i++) {
+        try {
+          compareValuesErrorOnMismatch(expectedValues.get(i), 
actualValues.get(i), i, s);
+        } catch (Exception ex) {
+          throw new Exception(ex.getMessage() + "\n\n" + 
printNearbyRecords(expectedRecords, actualRecords, i), ex);
+        }
+      }
+    }
+    if (actualRecords.size() < expectedRecords.size()) {
+      throw new Exception(findMissingColumns(expectedRecords.keySet(), 
actualRecords.keySet()));
+    }
+  }
+
+  private static String printNearbyRecords(Map<String, List<Object>> 
expectedRecords, Map<String, List<Object>> actualRecords, int offset) {
+    StringBuilder expected = new StringBuilder();
+    StringBuilder actual = new StringBuilder();
+    expected.append("Expected Records near verification failure:\n");
+    actual.append("Actual Records near verification failure:\n");
+    int firstRecordToPrint = Math.max(0, offset - 5);
+    List<?> expectedValuesInFirstColumn = 
expectedRecords.get(expectedRecords.keySet().iterator().next());
+    List<?> actualValuesInFirstColumn = 
expectedRecords.get(expectedRecords.keySet().iterator().next());
+    int numberOfRecordsToPrint = Math.min(Math.min(10, 
expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size());
+    for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) {
+      expected.append("Record Number: ").append(i).append(" { ");
+      actual.append("Record Number: ").append(i).append(" { ");
+      for (String s : actualRecords.keySet()) {
+        List<?> actualValues = actualRecords.get(s);
+        actual.append(s).append(" : ").append(actualValues.get(i)).append(",");
+      }
+      for (String s : expectedRecords.keySet()) {
+        List<?> expectedValues = expectedRecords.get(s);
+        expected.append(s).append(" : 
").append(expectedValues.get(i)).append(",");
+      }
+      expected.append(" }\n");
+      actual.append(" }\n");
+    }
+
+    return expected.append("\n\n").append(actual).toString();
+
+  }
+
+  private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final 
List<QueryDataBatch> records,
+      final RecordBatchLoader loader)
+      throws SchemaChangeException, UnsupportedEncodingException {
+    // TODO - this does not handle schema changes
+    Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
+
+    long totalRecords = 0;
+    QueryDataBatch batch;
+    int size = records.size();
+    for (int i = 0; i < size; i++) {
+      batch = records.get(i);
+      loader.load(batch.getHeader().getDef(), batch.getData());
+      logger.debug("reading batch with " + loader.getRecordCount() + " rows, 
total read so far " + totalRecords);
+      totalRecords += loader.getRecordCount();
+      for (VectorWrapper<?> w : loader) {
+        String field = 
SchemaPath.getSimplePath(w.getField().getName()).toExpr();
+        if (!combinedVectors.containsKey(field)) {
+          MaterializedField mf = w.getField();
+          ValueVector[] vvList = (ValueVector[]) 
Array.newInstance(mf.getValueClass(), 1);
+          vvList[0] = w.getValueVector();
+          combinedVectors.put(field, new HyperVectorValueIterator(mf, new 
HyperVectorWrapper<>(mf, vvList)));
+        } else {
+          
combinedVectors.get(field).getHyperVector().addVector(w.getValueVector());
+        }
+
+      }
+    }
+    for (HyperVectorValueIterator hvi : combinedVectors.values()) {
+      hvi.determineTotalSize();
+    }
+    return combinedVectors;
+  }
+
+  private static class BatchIterator implements Iterable<VectorAccessible>, 
AutoCloseable {
+    private final List<QueryDataBatch> dataBatches;
+    private final RecordBatchLoader batchLoader;
+
+    public BatchIterator(List<QueryDataBatch> dataBatches, RecordBatchLoader 
batchLoader) {
+      this.dataBatches = dataBatches;
+      this.batchLoader = batchLoader;
+    }
+
+    @Override
+    public Iterator<VectorAccessible> iterator() {
+      return new Iterator<VectorAccessible>() {
+
+        int index = -1;
+
+        @Override
+        public boolean hasNext() {
+          return index < dataBatches.size() - 1;
+        }
+
+        @Override
+        public VectorAccessible next() {
+          index++;
+          if (index == dataBatches.size()) {
+            throw new RuntimeException("Tried to call next when iterator had 
no more items.");
+          }
+          batchLoader.clear();
+          QueryDataBatch batch = dataBatches.get(index);
+          try {
+            batchLoader.load(batch.getHeader().getDef(), batch.getData());
+          } catch (SchemaChangeException e) {
+            throw new RuntimeException(e);
+          }
+          return batchLoader;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("Removing is not supported");
+        }
+      };
+    }
+
+    @Override
+    public void close() throws Exception {
+      batchLoader.clear();
+    }
+
+  }
+
+  /**
+   * Iterate over batches, and combine the batches into a map, where key is 
schema path, and value is
+   * the list of column values across all the batches.
+   * @param batches
+   * @return
+   * @throws SchemaChangeException
+   * @throws UnsupportedEncodingException
+   */
+  public static Map<String, List<Object>> 
addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+      throws SchemaChangeException, UnsupportedEncodingException {
+    Map<String, List<Object>> combinedVectors = new TreeMap<>();
+    addToCombinedVectorResults(batches, null, combinedVectors);
+    return combinedVectors;
+  }
+
+  /**
+   * Add to result vectors and compare batch schema against expected schema 
while iterating batches.
+   * @param batches
+   * @param  expectedSchema: the expected schema the batches should contain. 
Through SchemaChangeException
+   *                       if encounter different batch schema.
+   * @param combinedVectors: the vectors to hold the values when iterate the 
batches.
+   *
+   * @return number of batches
+   * @throws SchemaChangeException
+   * @throws UnsupportedEncodingException
+   */
+  public static int addToCombinedVectorResults(Iterable<VectorAccessible> 
batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors)
+       throws SchemaChangeException, UnsupportedEncodingException {
+    // TODO - this does not handle schema changes
+    int numBatch = 0;
+    long totalRecords = 0;
+    BatchSchema schema = null;
+    for (VectorAccessible loader : batches)  {
+      numBatch++;
+      if (expectedSchema != null) {
+        if (! expectedSchema.equals(loader.getSchema())) {
+          throw new SchemaChangeException(String.format("Batch schema does not 
match expected schema\n" +
+                  "Actual schema: %s.  Expected schema : %s",
+              loader.getSchema(), expectedSchema));
+        }
+      }
+
+      // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+      // SchemaChangeException, so check/clean throws clause above.
+      if (schema == null) {
+        schema = loader.getSchema();
+        for (MaterializedField mf : schema) {
+          combinedVectors.put(SchemaPath.getSimplePath(mf.getName()).toExpr(), 
new ArrayList<>());
+        }
+      } else {
+        // TODO - actually handle schema changes, this is just to get access 
to the SelectionVectorMode
+        // of the current batch, the check for a null schema is used to only 
mutate the schema once
+        // need to add new vectors and null fill for previous batches? 
distinction between null and non-existence important?
+        schema = loader.getSchema();
+      }
+      logger.debug("reading batch with " + loader.getRecordCount() + " rows, 
total read so far " + totalRecords);
+      totalRecords += loader.getRecordCount();
+      for (VectorWrapper<?> w : loader) {
+        String field = 
SchemaPath.getSimplePath(w.getField().getName()).toExpr();
+        ValueVector[] vectors;
+        if (w.isHyper()) {
+          vectors = w.getValueVectors();
+        } else {
+          vectors = new ValueVector[] {w.getValueVector()};
+        }
+        SelectionVector2 sv2 = null;
+        SelectionVector4 sv4 = null;
+        switch(schema.getSelectionVectorMode()) {
+          case TWO_BYTE:
+            sv2 = loader.getSelectionVector2();
+            break;
+          case FOUR_BYTE:
+            sv4 = loader.getSelectionVector4();
+            break;
+        }
+        if (sv4 != null) {
+          for (int j = 0; j < sv4.getCount(); j++) {
+            int complexIndex = sv4.get(j);
+            int batchIndex = complexIndex >> 16;
+            int recordIndexInBatch = complexIndex & 65535;
+            Object obj = 
vectors[batchIndex].getAccessor().getObject(recordIndexInBatch);
+            if (obj != null) {
+              if (obj instanceof Text) {
+                obj = obj.toString();
+              }
+            }
+            combinedVectors.get(field).add(obj);
+          }
+        }
+        else {
+          for (ValueVector vv : vectors) {
+            for (int j = 0; j < loader.getRecordCount(); j++) {
+              int index;
+              if (sv2 != null) {
+                index = sv2.getIndex(j);
+              } else {
+                index = j;
+              }
+              Object obj = vv.getAccessor().getObject(index);
+              if (obj != null) {
+                if (obj instanceof Text) {
+                  obj = obj.toString();
+                }
+              }
+              combinedVectors.get(field).add(obj);
+            }
+          }
+        }
+      }
+    }
+    return numBatch;
+  }
+
+  protected void compareSchemaOnly() throws Exception {
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+    List<QueryDataBatch> actual = null;
+    QueryDataBatch batch = null;
+    try {
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
+      batch = actual.get(0);
+      loader.load(batch.getHeader().getDef(), batch.getData());
+
+      final BatchSchema schema = loader.getSchema();
+      final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = 
testBuilder.getExpectedSchema();
+      if (schema.getFieldCount() != expectedSchema.size()) {
+        throw new Exception("Expected and actual numbers of columns do not 
match.");
+      }
+
+      for (int i = 0; i < schema.getFieldCount(); ++i) {
+        final String actualSchemaPath = schema.getColumn(i).getName();
+        final TypeProtos.MajorType actualMajorType = 
schema.getColumn(i).getType();
+
+        final String expectedSchemaPath = 
expectedSchema.get(i).getLeft().getRootSegmentPath();
+        final TypeProtos.MajorType expectedMajorType = 
expectedSchema.get(i).getValue();
+
+        if (!actualSchemaPath.equals(expectedSchemaPath)
+            || !actualMajorType.equals(expectedMajorType)) {
+          throw new Exception(String.format("Schema path or type mismatch for 
column #%d:\n" +
+                  "Expected schema path: %s\nActual   schema path: 
%s\nExpected type: %s\nActual   type: %s",
+              i, expectedSchemaPath, actualSchemaPath, 
Types.toString(expectedMajorType),
+              Types.toString(actualMajorType)));
+        }
+      }
+
+    } finally {
+      if (actual != null) {
+        for (QueryDataBatch b : actual) {
+          b.release();
+        }
+      }
+      loader.clear();
+    }
+  }
+
+  /**
+   * Use this method only if necessary to validate one query against another. 
If you are just validating against a
+   * baseline file use one of the simpler interfaces that will write the 
validation query for you.
+   *
+   * @throws Exception
+   */
+  protected void compareUnorderedResults() throws Exception {
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+
+    List<QueryDataBatch> actual = Collections.emptyList();
+    List<QueryDataBatch> expected = Collections.emptyList();
+    List<Map<String, Object>> expectedRecords = new ArrayList<>();
+    List<Map<String, Object>> actualRecords = new ArrayList<>();
+
+    try {
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
+
+      checkNumBatches(actual);
+
+      addTypeInfoIfMissing(actual.get(0), testBuilder);
+      addToMaterializedResults(actualRecords, actual, loader);
+
+      // If baseline data was not provided to the test builder directly, we 
must run a query for the baseline, this includes
+      // the cases where the baseline is stored in a file.
+      if (baselineRecords == null) {
+        test(baselineOptionSettingQueries);
+        expected = testRunAndReturn(baselineQueryType, 
testBuilder.getValidationQuery());
+        addToMaterializedResults(expectedRecords, expected, loader);
+      } else {
+        expectedRecords = baselineRecords;
+      }
+
+      compareResults(expectedRecords, actualRecords);
+    } finally {
+      cleanupBatches(actual, expected);
+    }
+  }
+
+  /**
+   * Use this method only if necessary to validate one query against another. 
If you are just validating against a
+   * baseline file use one of the simpler interfaces that will write the 
validation query for you.
+   *
+   * @throws Exception
+   */
+  protected void compareOrderedResults() throws Exception {
+    if (highPerformanceComparison) {
+      if (baselineQueryType == null) {
+        throw new Exception("Cannot do a high performance comparison without 
using a baseline file");
+      }
+      compareResultsHyperVector();
+    } else {
+      compareMergedOnHeapVectors();
+    }
+  }
+
+  public void compareMergedOnHeapVectors() throws Exception {
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+
+    List<QueryDataBatch> actual = Collections.emptyList();
+    List<QueryDataBatch> expected = Collections.emptyList();
+    Map<String, List<Object>> actualSuperVectors;
+    Map<String, List<Object>> expectedSuperVectors = null;
+
+    try {
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
+
+      checkNumBatches(actual);
+
+      // To avoid extra work for test writers, types can optionally be 
inferred from the test query
+      addTypeInfoIfMissing(actual.get(0), testBuilder);
+
+      BatchIterator batchIter = new BatchIterator(actual, loader);
+      actualSuperVectors = addToCombinedVectorResults(batchIter);
+      batchIter.close();
+
+      // If baseline data was not provided to the test builder directly, we 
must run a query for the baseline, this includes
+      // the cases where the baseline is stored in a file.
+      if (baselineRecords == null) {
+        if (baselineQueryType == null && baselineColumns != null) {
+          checkAscendingOrdering(actualSuperVectors);
+          return;
+        } else {
+          test(baselineOptionSettingQueries);
+          expected = testRunAndReturn(baselineQueryType, 
testBuilder.getValidationQuery());
+          BatchIterator exBatchIter = new BatchIterator(expected, loader);
+          expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
+          exBatchIter.close();
+        }
+      } else {
+        // data is built in the TestBuilder in a row major format as it is 
provided by the user
+        // translate it here to vectorized, the representation expected by the 
ordered comparison
+        expectedSuperVectors = 
translateRecordListToHeapVectors(baselineRecords);
+      }
+
+      compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+    } catch (Exception e) {
+      throw new Exception(e.getMessage() + "\nFor query: " + query , e);
+    } finally {
+      cleanupBatches(expected, actual);
+    }
+  }
+
+  private void checkAscendingOrdering(Map<String, List<Object>> results) {
+    int numRecords = results.get(baselineColumns[0]).size();
+
+    for (int index = 1; index < numRecords; index++) {
+      int prevIndex = index - 1;
+
+      for (String column: baselineColumns) {
+        List<Object> objects = results.get(column);
+        Object prevObject = objects.get(prevIndex);
+        Object currentObject = objects.get(index);
+
+        
Assert.assertTrue(RowSetComparison.ObjectComparator.INSTANCE.compare(prevObject,
 currentObject) <= 0);
+      }
+    }
+  }
+
+  public static Map<String, List<Object>> 
translateRecordListToHeapVectors(List<Map<String, Object>> records) {
+    Map<String, List<Object>> ret = new TreeMap<>();
+    for (String s : records.get(0).keySet()) {
+      ret.put(s, new ArrayList<>());
+    }
+    for (Map<String, Object> m : records) {
+      for (String s : m.keySet()) {
+        ret.get(s).add(m.get(s));
+      }
+    }
+    return ret;
+  }
+
+  public void compareResultsHyperVector() throws Exception {
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+
+    test(testOptionSettingQueries);
+    List<QueryDataBatch> results = testRunAndReturn(queryType, query);
+
+    checkNumBatches(results);
+
+    // To avoid extra work for test writers, types can optionally be inferred 
from the test query
+    addTypeInfoIfMissing(results.get(0), testBuilder);
+
+    Map<String, HyperVectorValueIterator> actualSuperVectors = 
addToHyperVectorMap(results, loader);
+
+    test(baselineOptionSettingQueries);
+    List<QueryDataBatch> expected = testRunAndReturn(baselineQueryType, 
testBuilder.getValidationQuery());
+
+    Map<String, HyperVectorValueIterator> expectedSuperVectors = 
addToHyperVectorMap(expected, loader);
+
+    compareHyperVectors(expectedSuperVectors, actualSuperVectors);
+    cleanupBatches(results, expected);
+  }
+
+  private void checkNumBatches(final List<QueryDataBatch> results) {
+    if (expectedNumBatches != EXPECTED_BATCH_COUNT_NOT_SET) {
+      final int actualNumBatches = results.size();
+      assertEquals(String.format("Expected %d batches but query returned %d 
non empty batch(es)%n", expectedNumBatches,
+          actualNumBatches), expectedNumBatches, actualNumBatches);
+    }
+  }
+
+  private void addTypeInfoIfMissing(QueryDataBatch batch, TestBuilder 
testBuilder) {
+    if (! testBuilder.typeInfoSet()) {
+      Map<SchemaPath, TypeProtos.MajorType> typeMap = 
getTypeMapFromBatch(batch);
+      testBuilder.baselineTypes(typeMap);
+    }
+
+  }
+
+  private Map<SchemaPath, TypeProtos.MajorType> 
getTypeMapFromBatch(QueryDataBatch batch) {
+    Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+    for (int i = 0; i < batch.getHeader().getDef().getFieldCount(); i++) {
+      
typeMap.put(SchemaPath.getSimplePath(MaterializedField.create(batch.getHeader().getDef().getField(i)).getName()),
+          batch.getHeader().getDef().getField(i).getMajorType());
+    }
+    return typeMap;
+  }
+
+  @SafeVarargs
+  private final void cleanupBatches(List<QueryDataBatch>... results) {
+    for (List<QueryDataBatch> resultList : results ) {
+      for (QueryDataBatch result : resultList) {
+        result.release();
+      }
+    }
+  }
+
+  public static void addToMaterializedResults(List<Map<String, Object>> 
materializedRecords,
+                                          List<QueryDataBatch> records,
+                                          RecordBatchLoader loader)
+      throws SchemaChangeException, UnsupportedEncodingException {
+    long totalRecords = 0;
+    QueryDataBatch batch;
+    int size = records.size();
+    for (int i = 0; i < size; i++) {
+      batch = records.get(0);
+      loader.load(batch.getHeader().getDef(), batch.getData());
+      // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+      // SchemaChangeException, so check/clean throws clause above.
+      logger.debug("reading batch with " + loader.getRecordCount() + " rows, 
total read so far " + totalRecords);
+      totalRecords += loader.getRecordCount();
+      for (int j = 0; j < loader.getRecordCount(); j++) {
+        Map<String, Object> record = new TreeMap<>();
+        for (VectorWrapper<?> w : loader) {
+          Object obj = w.getValueVector().getAccessor().getObject(j);
+          if (obj != null) {
+            if (obj instanceof Text) {
+              obj = obj.toString();
+            }
+            
record.put(SchemaPath.getSimplePath(w.getField().getName()).toExpr(), obj);
+          }
+          
record.put(SchemaPath.getSimplePath(w.getField().getName()).toExpr(), obj);
+        }
+        materializedRecords.add(record);
+      }
+      records.remove(0);
+      batch.release();
+      loader.clear();
+    }
+  }
+
+  public static boolean compareValuesErrorOnMismatch(Object expected, Object 
actual, int counter, String column) throws Exception {
+
+    if (compareValues(expected, actual, counter, column)) {
+      return true;
+    }
+    if (expected == null) {
+      throw new Exception("at position " + counter + " column '" + column + "' 
mismatched values, expected: null " +
+        "but received " + actual + "(" + actual.getClass().getSimpleName() + 
")");
+    }
+    if (actual == null) {
+      throw new Exception("unexpected null at position " + counter + " column 
'" + column + "' should have been:  " + expected);
+    }
+    if (actual instanceof byte[]) {
+      throw new Exception("at position " + counter + " column '" + column + "' 
mismatched values, expected: "
+        + new String((byte[])expected, "UTF-8") + " but received " + new 
String((byte[])actual, "UTF-8"));
+    }
+    if (!expected.equals(actual)) {
+      throw new Exception("at position " + counter + " column '" + column + "' 
mismatched values, expected: "
+        + expected + "(" + expected.getClass().getSimpleName() + ") but 
received " + actual + "(" + actual.getClass().getSimpleName() + ")");
+    }
+    return true;
+  }
+
+  public static boolean compareValues(Object expected, Object actual, int 
counter, String column) throws Exception {
+    if (expected == null) {
+      if (actual == null) {
+        if (VERBOSE_DEBUG) {
+          logger.debug("(1) at position " + counter + " column '" + column + 
"' matched value:  " + expected );
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+    if (actual == null) {
+      return false;
+    }
+    if (actual instanceof byte[]) {
+      if (!Arrays.equals((byte[]) expected, (byte[]) actual)) {
+        return false;
+      } else {
+        if (VERBOSE_DEBUG) {
+          logger.debug("at position " + counter + " column '" + column + "' 
matched value " + new String((byte[])expected, "UTF-8"));
+        }
+        return true;
+      }
+    }
+    if (!expected.equals(actual)) {
+      return false;
+    } else {
+      if (VERBOSE_DEBUG) {
+        logger.debug("at position " + counter + " column '" + column + "' 
matched value:  " + expected );
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Compare two result sets, ignoring ordering.
+   *
+   * @param expectedRecords - list of records from baseline
+   * @param actualRecords - list of records from test query, WARNING - this 
list is destroyed in this method
+   * @throws Exception
+   */
+  private void compareResults(List<Map<String, Object>> expectedRecords, 
List<Map<String, Object>> actualRecords) throws Exception {
+    assertEquals("Different number of records returned", 
expectedRecords.size(), actualRecords.size());
+
+    int i = 0;
+    int counter = 0;
+    boolean found;
+    for (Map<String, Object> expectedRecord : expectedRecords) {
+      i = 0;
+      found = false;
+      findMatch:
+      for (Map<String, Object> actualRecord : actualRecords) {
+        for (String s : actualRecord.keySet()) {
+          if (!expectedRecord.containsKey(s)) {
+            throw new Exception("Unexpected column '" + s + "' returned by 
query.");
+          }
+          if (!compareValues(expectedRecord.get(s), actualRecord.get(s), 
counter, s)) {
+            i++;
+            continue findMatch;
+          }
+        }
+        if (actualRecord.size() < expectedRecord.size()) {
+          throw new Exception(findMissingColumns(expectedRecord.keySet(), 
actualRecord.keySet()));
+        }
+        found = true;
+        break;
+      }
+      if (!found) {
+        StringBuilder sb = new StringBuilder();
+        for (int expectedRecordDisplayCount = 0;
+             expectedRecordDisplayCount < 10 && expectedRecordDisplayCount < 
expectedRecords.size();
+             expectedRecordDisplayCount++) {
+          
sb.append(printRecord(expectedRecords.get(expectedRecordDisplayCount)));
+        }
+        String expectedRecordExamples = sb.toString();
+        sb.setLength(0);
+        for (int actualRecordDisplayCount = 0;
+             actualRecordDisplayCount < 10 && actualRecordDisplayCount < 
actualRecords.size();
+             actualRecordDisplayCount++) {
+          sb.append(printRecord(actualRecords.get(actualRecordDisplayCount)));
+        }
+        String actualRecordExamples = sb.toString();
+        throw new Exception(String.format("After matching %d records, did not 
find expected record in result set:\n %s\n\n" +
+            "Some examples of expected records:\n%s\n\n Some examples of 
records returned by the test query:\n%s",
+            counter, printRecord(expectedRecord), expectedRecordExamples, 
actualRecordExamples));
+      } else {
+        actualRecords.remove(i);
+        counter++;
+      }
+    }
+    assertEquals(0, actualRecords.size());
+  }
+
+  private static String findMissingColumns(Set<String> expected, Set<String> 
actual) {
+    String missingCols = "";
+    for (String colName : expected) {
+      if (!actual.contains(colName)) {
+        missingCols += colName + ", ";
+      }
+    }
+    return "Expected column(s) " + missingCols + " not found in result set: " 
+ actual + ".";
+  }
+
+  private String printRecord(Map<String, ?> record) {
+    String ret = "";
+    for (String s : record.keySet()) {
+      ret += s + " : "  + record.get(s) + ", ";
+    }
+    return ret + "\n";
+  }
+
+  private void test(String query) throws Exception {
+    services.test(query);
+  }
+
+  private List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) 
throws Exception {
+    return services.testRunAndReturn(type, query);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
index 67ae4a3..8366b7a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
@@ -24,12 +24,21 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.LogFixture.LogFixtureBuilder;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.drill.test.rowSet.file.JsonFileBuilder;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
 import ch.qos.logback.classic.Level;
@@ -50,7 +59,6 @@ import ch.qos.logback.classic.Level;
  * you can create your cluster fixture in a JUnit <tt>{@literal @}Before</tt>
  * method, and shut it down in <tt>{@literal @}After</tt> method.
  * <p>
- * See {@link org.apache.drill.test.package_info the package overview} for 
details.
  */
 
 // Note: Test itself is ignored because this is an example, not a
@@ -60,6 +68,15 @@ import ch.qos.logback.classic.Level;
 public class ExampleTest {
 
   /**
+   * This test watcher creates all the temp directories that are required for 
an integration test with a Drillbit. The
+   * {@link ClusterFixture} and {@link BaseTestQuery} classes automatically 
configure their Drillbits to use the temp
+   * directories created by this test watcher. Please see {@link 
BaseDirTestWatcher} and package-info.java. Please see
+   * {@link #secondTest()} for an example.
+   */
+  @Rule
+  public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  /**
    * Example of the simplest possible test case: set up a default
    * cluster (with one Drillbit), a corresponding client, run a
    * query and print the results.
@@ -69,38 +86,61 @@ public class ExampleTest {
 
   @Test
   public void firstTest() throws Exception {
-    try (ClusterFixture cluster = ClusterFixture.standardCluster();
+    try (ClusterFixture cluster = 
ClusterFixture.standardCluster(dirTestWatcher);
          ClientFixture client = cluster.clientFixture()) {
       client.queryBuilder().sql("SELECT * FROM `cp`.`employee.json` LIMIT 
10").printCsv();
     }
   }
 
   /**
-   * Example that uses the fixture builder to build a cluster fixture. Lets
-   * you set configuration (boot-time) options, session options, system options
-   * and more.
    * <p>
-   * Also shows how to display the plan JSON and just run a query silently,
-   * getting just the row count, batch count and run time.
-   *
+   *   Example that uses the fixture builder to build a cluster fixture. Lets
+   *   you set configuration (boot-time) options, session options, system 
options
+   *   and more.
+   * </p>
+   * <p>
+   *   You can write test files to the {@link BaseDirTestWatcher#getRootDir()} 
and query them in the test.
+   * </p>
+   * <p>
+   *   Also shows how to display the plan JSON and just run a query silently,
+   *   getting just the row count, batch count and run time.
+   * </p>
    * @throws Exception if anything goes wrong
    */
 
   @Test
   public void secondTest() throws Exception {
-    ClusterFixtureBuilder builder = ClusterFixture.builder()
-        .configProperty(ExecConstants.SLICE_TARGET, 10)
-        ;
+    try (RootAllocator allocator = new RootAllocator(100_000_000)) {
+      final File tableFile = dirTestWatcher
+        .getRootDir()
+        .toPath()
+        .resolve("employee.json")
+        .toFile();
 
-    try (ClusterFixture cluster = builder.build();
-         ClientFixture client = cluster.clientFixture()) {
-      String sql = "SELECT * FROM `cp`.`employee.json` LIMIT 10";
-      System.out.println( client.queryBuilder().sql(sql).explainJson() );
-      QuerySummary results = client.queryBuilder().sql(sql).run();
-      System.out.println(String.format("Read %d rows", results.recordCount()));
-      // Usually we want to test something. Here, just test that we got
-      // the 10 records.
-      assertEquals(10, results.recordCount());
+      final BatchSchema schema = new SchemaBuilder()
+        .add("id", Types.required(TypeProtos.MinorType.VARCHAR))
+        .add("name", Types.required(TypeProtos.MinorType.VARCHAR))
+        .build();
+
+      final RowSet rowSet = new RowSetBuilder(allocator, schema)
+        .add("1", "kiwi")
+        .add("2", "watermelon")
+        .build();
+
+      new JsonFileBuilder(rowSet).build(tableFile);
+      rowSet.clear();
+
+      ClusterFixtureBuilder builder = 
ClusterFixture.builder(dirTestWatcher).configProperty(ExecConstants.SLICE_TARGET,
 10);
+
+      try (ClusterFixture cluster = builder.build(); ClientFixture client = 
cluster.clientFixture()) {
+        String sql = "SELECT * FROM `dfs`.`test/employee.json`";
+        System.out.println(client.queryBuilder().sql(sql).explainJson());
+        QuerySummary results = client.queryBuilder().sql(sql).run();
+        System.out.println(String.format("Read %d rows", 
results.recordCount()));
+        // Usually we want to test something. Here, just test that we got
+        // the 2 records.
+        assertEquals(2, results.recordCount());
+      }
     }
   }
 
@@ -125,7 +165,7 @@ public class ExampleTest {
 
   @Test
   public void thirdTest() throws Exception {
-    try (ClusterFixture cluster = ClusterFixture.standardCluster();
+    try (ClusterFixture cluster = 
ClusterFixture.standardCluster(dirTestWatcher);
          ClientFixture client = cluster.clientFixture()) {
       String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_5`";
       client.queryBuilder().sql(sql).printCsv();
@@ -157,16 +197,15 @@ public class ExampleTest {
         // All debug messages in the xsort package
         .logger("org.apache.drill.exec.physical.impl.xsort", Level.DEBUG)
         // And trace messages for one class.
-        .logger(ExternalSortBatch.class, Level.TRACE)
-        ;
-    ClusterFixtureBuilder builder = ClusterFixture.builder()
+        .logger(ExternalSortBatch.class, Level.TRACE);
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
         // Easy way to run single threaded for easy debugging
         .maxParallelization(1)
         // Set some session options
         .sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2L * 1024 
* 1024 * 1024)
         .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
-        .sessionOption(PlannerSettings.HASHAGG.getOptionName(), false)
-        ;
+        .sessionOption(PlannerSettings.HASHAGG.getOptionName(), false);
 
     try (LogFixture logs = logBuilder.build();
          ClusterFixture cluster = builder.build();
@@ -200,13 +239,11 @@ public class ExampleTest {
    *
    * @throws Exception if anything goes wrong
    */
-
   @Test
   public void fifthTest() throws Exception {
-    ClusterFixtureBuilder builder = ClusterFixture.builder()
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
         .maxParallelization(1)
-        .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, 
true)
-        ;
+        .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, 
true);
 
     try (ClusterFixture cluster = builder.build();
          ClientFixture client = cluster.clientFixture()) {
@@ -231,7 +268,6 @@ public class ExampleTest {
    *
    * @param args not used
    */
-
   public static void main(String args) {
     try {
       new ExampleTest().firstTest();

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java
new file mode 100644
index 0000000..ec6c91e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+
+public class HadoopUtils {
+  public static final String LOCAL_FS_SCHEME = "file://";
+
+  public static org.apache.hadoop.fs.Path javaToHadoopPath(java.nio.file.Path 
javaPath) {
+    return new org.apache.hadoop.fs.Path(javaPath.toUri());
+  }
+
+  public static java.nio.file.Path hadoopToJavaPath(org.apache.hadoop.fs.Path 
hadoopPath) {
+    final String pathString = hadoopPath.toUri().getPath();
+    final URI uri;
+
+    try {
+      uri = new URI(LOCAL_FS_SCHEME + pathString);
+    } catch (URISyntaxException e) {
+      // This should never happen
+      throw new RuntimeException(e);
+    }
+
+    return Paths.get(uri);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index e7bf61f..58f888d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.drill.PlanTestBase;
-import org.apache.drill.QueryTestUtil;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;

http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
new file mode 100644
index 0000000..db3e2ba
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.drill.test.BaseTestQuery.SilentListener;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.client.PrintingResultsListener;
+import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.compile.ClassTransformer;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.util.VectorUtil;
+
+/**
+ * Utilities useful for tests that issue SQL queries.
+ */
+public class QueryTestUtil {
+
+  public static final String TEST_QUERY_PRINTING_SILENT = 
"drill.test.query.printing.silent";
+
+  /**
+   * Constructor. All methods are static.
+   */
+  private QueryTestUtil() {
+  }
+
+  /**
+   * Create a DrillClient that can be used to query a drill cluster.
+   *
+   * @param drillConfig
+   * @param remoteServiceSet remote service set
+   * @param maxWidth maximum width per node
+   * @param props Connection properties contains properties such as "user", 
"password", "schema" etc
+   * @return the newly created client
+   * @throws RpcException if there is a problem setting up the client
+   */
+  public static DrillClient createClient(final DrillConfig drillConfig, final 
RemoteServiceSet remoteServiceSet,
+      final int maxWidth, final Properties props) throws RpcException, 
OutOfMemoryException {
+    final DrillClient drillClient = new DrillClient(drillConfig, 
remoteServiceSet.getCoordinator());
+    drillClient.connect(props);
+
+    final List<QueryDataBatch> results = drillClient.runQuery(
+        QueryType.SQL, String.format("alter session set `%s` = %d",
+            ExecConstants.MAX_WIDTH_PER_NODE_KEY, maxWidth));
+    for (QueryDataBatch queryDataBatch : results) {
+      queryDataBatch.release();
+    }
+
+    return drillClient;
+  }
+
+  /**
+   * Normalize the query relative to the test environment.
+   *
+   * <p>Looks for "${WORKING_PATH}" in the query string, and replaces it the 
current
+   * working patch obtained from {@link TestTools#WORKING_PATH}.
+   *
+   * @param query the query string
+   * @return the normalized query string
+   */
+  public static String normalizeQuery(final String query) {
+    if (query.contains("${WORKING_PATH}")) {
+      return query.replaceAll(Pattern.quote("${WORKING_PATH}"), 
Matcher.quoteReplacement(TestTools.WORKING_PATH.toString()));
+    } else if (query.contains("[WORKING_PATH]")) {
+      return query.replaceAll(Pattern.quote("[WORKING_PATH]"), 
Matcher.quoteReplacement(TestTools.WORKING_PATH.toString()));
+    }
+    return query;
+  }
+
+  /**
+   * Execute a SQL query, and print the results.
+   *
+   * @param drillClient drill client to use
+   * @param type type of the query
+   * @param queryString query string
+   * @return number of rows returned
+   * @throws Exception
+   */
+  public static int testRunAndPrint(
+      final DrillClient drillClient, final QueryType type, final String 
queryString) throws Exception {
+    final String query = normalizeQuery(queryString);
+    DrillConfig config = drillClient.getConfig();
+    AwaitableUserResultsListener resultListener =
+        new AwaitableUserResultsListener(
+            config.getBoolean(TEST_QUERY_PRINTING_SILENT) ?
+                new SilentListener() :
+                new PrintingResultsListener(config, Format.TSV, 
VectorUtil.DEFAULT_COLUMN_WIDTH)
+        );
+    drillClient.runQuery(type, query, resultListener);
+    return resultListener.await();
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and print the 
results.
+   *
+   * @param drillClient drill client to use
+   * @param queryString the query string
+   * @throws Exception
+   */
+  public static void test(final DrillClient drillClient, final String 
queryString) throws Exception{
+    final String query = normalizeQuery(queryString);
+    String[] queries = query.split(";");
+    for (String q : queries) {
+      final String trimmedQuery = q.trim();
+      if (trimmedQuery.isEmpty()) {
+        continue;
+      }
+      testRunAndPrint(drillClient, QueryType.SQL, trimmedQuery);
+    }
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and print the 
results, with the option to
+   * add formatted arguments to the query string.
+   *
+   * @param drillClient drill client to use
+   * @param query the query string; may contain formatting specifications to 
be used by
+   *   {@link String#format(String, Object...)}.
+   * @param args optional args to use in the formatting call for the query 
string
+   * @throws Exception
+   */
+  public static void test(final DrillClient drillClient, final String query, 
Object... args) throws Exception {
+    test(drillClient, String.format(query, args));
+  }
+
+  /**
+   * Execute a single query with a user supplied result listener.
+   *
+   * @param drillClient drill client to use
+   * @param type type of query
+   * @param queryString the query string
+   * @param resultListener the result listener
+   */
+  public static void testWithListener(final DrillClient drillClient, final 
QueryType type,
+      final String queryString, final UserResultsListener resultListener) {
+    final String query = QueryTestUtil.normalizeQuery(queryString);
+    drillClient.runQuery(type, query, resultListener);
+  }
+
+  /**
+   * Set up the options to test the scalar replacement retry option (see
+   * ClassTransformer.java). Scalar replacement rewrites bytecode to replace
+   * value holders (essentially boxed values) with their member variables as
+   * locals. There is still one pattern that doesn't work, and occasionally new
+   * ones are introduced. This can be used in tests that exercise failing 
patterns.
+   *
+   * <p>This also flushes the compiled code cache.
+   *
+   * @param drillbit the drillbit
+   * @param srOption the scalar replacement option value to use
+   * @return the original scalar replacement option setting (so it can be 
restored)
+   */
+  @SuppressWarnings("resource")
+  public static OptionValue setupScalarReplacementOption(
+      final Drillbit drillbit, final ClassTransformer.ScalarReplacementOption 
srOption) {
+    // set the system option
+    final DrillbitContext drillbitContext = drillbit.getContext();
+    final SystemOptionManager optionManager = 
drillbitContext.getOptionManager();
+    final OptionValue originalOptionValue = 
optionManager.getOption(ClassTransformer.SCALAR_REPLACEMENT_OPTION);
+    optionManager.setLocalOption(ClassTransformer.SCALAR_REPLACEMENT_OPTION, 
srOption.name().toLowerCase());
+
+    // flush the code cache
+    drillbitContext.getCompiler().flushCache();
+
+    return originalOptionValue;
+  }
+
+  /**
+   * Restore the original scalar replacement option returned from
+   * setupScalarReplacementOption().
+   *
+   * <p>This also flushes the compiled code cache.
+   *
+   * @param drillbit the drillbit
+   * @param srOption the scalar replacement option value to use
+   */
+  public static void restoreScalarReplacementOption(final Drillbit drillbit, 
final String srOption) {
+    @SuppressWarnings("resource")
+    final DrillbitContext drillbitContext = drillbit.getContext();
+    @SuppressWarnings("resource")
+    final OptionManager optionManager = drillbitContext.getOptionManager();
+    optionManager.setLocalOption(ClassTransformer.SCALAR_REPLACEMENT_OPTION, 
srOption);
+
+    // flush the code cache
+    drillbitContext.getCompiler().flushCache();
+  }
+
+}

Reply via email to