http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index f36c193..f0653f7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URL; +import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -31,11 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.commons.io.FileUtils; -import org.apache.drill.BaseTestQuery; -import org.apache.drill.DrillTestWrapper.TestServices; -import org.apache.drill.QueryTestUtil; -import org.apache.drill.TestBuilder; +import org.apache.drill.test.DrillTestWrapper.TestServices; import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.FormatPluginConfig; @@ -57,22 +54,24 @@ import org.apache.drill.exec.store.dfs.WorkspaceConfig; import org.apache.drill.exec.store.mock.MockStorageEngine; import org.apache.drill.exec.store.mock.MockStorageEngineConfig; import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; -import org.apache.drill.exec.util.TestUtilities; +import org.apache.drill.exec.util.StoragePluginTestUtils; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.io.Resources; +import static org.apache.drill.exec.util.StoragePluginTestUtils.DEFAULT_SCHEMA; +import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA; +import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA; +import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA; + /** * Test fixture to start a Drillbit with provide options, create a client, and * execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides * a builder to set the necessary embedded Drillbit and client options, then * creates the requested Drillbit and client. */ - public class ClusterFixture extends BaseFixture implements AutoCloseable { - // private static final org.slf4j.Logger logger = - // org.slf4j.LoggerFactory.getLogger(ClientFixture.class); public static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache"; public static final int MAX_WIDTH_PER_NODE = 2; @@ -87,10 +86,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { // The CTTAS function requires that the default temporary workspace be // writable. By default, the default temporary workspace points to // dfs.tmp. But, the test setup marks dfs.tmp as read-only. To work - // around this, tests are supposed to use dfs_test. So, we need to - // set the default temporary workspace to dfs_test.tmp. + // around this, tests are supposed to use dfs. So, we need to + // set the default temporary workspace to dfs.tmp. - put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, BaseTestQuery.TEMP_SCHEMA); + put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, DFS_TMP_SCHEMA); put(ExecConstants.HTTP_ENABLE, false); put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, true); put("drill.catastrophic_to_standard_out", true); @@ -121,7 +120,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { // mode. put(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT, "/tmp/drill/tests"); - put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, "file:/tmp/drill/tests"); } }; @@ -132,30 +130,22 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { private boolean ownsZK; private ZookeeperHelper zkHelper; private RemoteServiceSet serviceSet; - private File dfsTestTempDir; protected List<ClientFixture> clients = new ArrayList<>(); protected RestClientFixture restClientFixture; private boolean usesZk; - private boolean preserveLocalFiles; - private boolean isLocal; private Properties clientProps; - - /** - * Temporary directories created for this test cluster. - * Each is removed when closing the cluster. - */ - - private List<File> tempDirs = new ArrayList<>(); + private final ClusterFixtureBuilder builder; ClusterFixture(ClusterFixtureBuilder builder) { + this.builder = Preconditions.checkNotNull(builder); - setClientProps(builder); - configureZk(builder); + setClientProps(); + configureZk(); try { - createConfig(builder); + createConfig(); allocator = RootAllocatorFactory.newRoot(config); - startDrillbits(builder); - applyOptions(builder); + startDrillbits(); + applyOptions(); } catch (Exception e) { // Translate exceptions to unchecked to avoid cluttering // tests. Failures will simply fail the test itself. @@ -166,9 +156,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { /** * Set the client properties to be used by client fixture. - * @param builder {@link ClusterFixtureBuilder#clientProps} */ - private void setClientProps(ClusterFixtureBuilder builder) { + private void setClientProps() { clientProps = builder.clientProps; } @@ -176,8 +165,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { return clientProps; } - private void configureZk(ClusterFixtureBuilder builder) { - + private void configureZk() { // Start ZK if requested. String zkConnect = null; @@ -213,8 +201,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } } - private void createConfig(ClusterFixtureBuilder builder) throws Exception { - + private void createConfig() throws Exception { // Create a config // Because of the way DrillConfig works, we can set the ZK // connection string only if a property set is provided. @@ -226,44 +213,14 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { serviceSet = null; usesZk = true; - isLocal = false; } else { // Embedded Drillbit. serviceSet = RemoteServiceSet.getLocalServiceSet(); - isLocal = true; } } - private void startDrillbits(ClusterFixtureBuilder builder) throws Exception { -// // Ensure that Drill uses the log directory determined here rather than -// // it's hard-coded defaults. WIP: seems to be needed some times but -// // not others. -// -// String logDir = null; -// if (builder.tempDir != null) { -// logDir = builder.tempDir.getAbsolutePath(); -// } -// if (logDir == null) { -// logDir = config.getString(ExecConstants.DRILL_TMP_DIR); -// if (logDir != null) { -// logDir += "/drill/log"; -// } -// } -// if (logDir == null) { -// logDir = "/tmp/drill"; -// } -// new File(logDir).mkdirs(); -// System.setProperty("drill.log-dir", logDir); - - dfsTestTempDir = makeTempDir("dfs-test"); - - // Clean up any files that may have been left from the - // last run. - - preserveLocalFiles = builder.preserveLocalFiles; - removeLocalFiles(); - + private void startDrillbits() throws Exception { // Start the Drillbits. Preconditions.checkArgument(builder.bitCount > 0); @@ -302,12 +259,16 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } private void configureStoragePlugins(Drillbit bit) throws Exception { - // Create the dfs_test name space + // Create the dfs name space + builder.dirTestWatcher.newDfsTestTmpDir(); @SuppressWarnings("resource") final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage(); - TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTempDir.getAbsolutePath()); - TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry); + StoragePluginTestUtils.configureFormatPlugins(pluginRegistry); + + StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry, builder.dirTestWatcher.getDfsTestTmpDir(), TMP_SCHEMA); + StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry, builder.dirTestWatcher.getRootDir(), ROOT_SCHEMA); + StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry, builder.dirTestWatcher.getRootDir(), DEFAULT_SCHEMA); // Create the mock data plugin @@ -319,8 +280,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(MockStorageEngineConfig.NAME, config, plugin); } - private void applyOptions(ClusterFixtureBuilder builder) throws Exception { - + private void applyOptions() throws Exception { // Apply system options if (builder.systemOptions != null) { @@ -342,7 +302,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { public Drillbit drillbit(String name) { return bits.get(name); } public Collection<Drillbit> drillbits() { return bits.values(); } public RemoteServiceSet serviceSet() { return serviceSet; } - public File getDfsTestTmpDir() { return dfsTestTempDir; } public ClientFixture.ClientBuilder clientBuilder() { return new ClientFixture.ClientBuilder(this); @@ -442,74 +401,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } } zkHelper = null; - - // Delete any local files, if we wrote to the local - // persistent store. But, leave the files if the user wants - // to review them, for debugging, say. Note that, even if the - // files are preserved here, they will be removed when the - // next cluster fixture starts, else the CTTAS initialization - // will fail. - - if (! preserveLocalFiles) { - try { - removeLocalFiles(); - } catch (Exception e) { - ex = ex == null ? e : ex; - } - } - - // Remove temporary directories created for this cluster session. - - try { - removeTempDirs(); - } catch (Exception e) { - ex = ex == null ? e : ex; - } - if (ex != null) { - throw ex; - } - } - - /** - * Removes files stored locally in the "local store provider." - * Required because CTTAS setup fails if these files are left from one - * run to the next. - * - * @throws IOException if a directory cannot be deleted - */ - - private void removeLocalFiles() throws IOException { - - // Don't delete if this is not a local Drillbit. - - if (! isLocal) { - return; - } - - // Remove the local files if they exist. - - String localStoreLocation = config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH); - removeDir(new File(localStoreLocation)); - } - - private void removeTempDirs() throws IOException { - IOException ex = null; - for (File dir : tempDirs) { - try { - removeDir(dir); - } catch (IOException e) { - ex = ex == null ? e : ex; - } - } - if (ex != null) { - throw ex; - } - } - - public void removeDir(File dir) throws IOException { - if (dir.exists()) { - FileUtils.deleteDirectory(dir); - } } /** @@ -537,7 +428,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * Define a workspace within an existing storage plugin. Useful for * pointing to local file system files outside the Drill source tree. * - * @param pluginName name of the plugin like "dfs" or "dfs_test". + * @param pluginName name of the plugin like "dfs". * @param schemaName name of the new schema * @param path directory location (usually local) * @param defaultFormat default format for files in the schema @@ -584,11 +475,14 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { public static final String EXPLAIN_PLAN_TEXT = "text"; public static final String EXPLAIN_PLAN_JSON = "json"; - public static ClusterFixtureBuilder builder() { - ClusterFixtureBuilder builder = new ClusterFixtureBuilder() + public static ClusterFixtureBuilder builder(BaseDirTestWatcher dirTestWatcher) { + ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher) .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE); Properties props = new Properties(); props.putAll(ClusterFixture.TEST_CONFIGURATIONS); + props.setProperty(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath()); + props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath()); + builder.configBuilder.configProps(props); return builder; } @@ -603,9 +497,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * * @return a fixture builder with no default properties set */ - - public static ClusterFixtureBuilder bareBuilder() { - return new ClusterFixtureBuilder(); + public static ClusterFixtureBuilder bareBuilder(BaseDirTestWatcher dirTestWatcher) { + return new ClusterFixtureBuilder(dirTestWatcher); } /** @@ -645,8 +538,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * @return a cluster fixture with standard options * @throws Exception if something goes wrong */ - public static ClusterFixture standardCluster() { - return builder().build(); + public static ClusterFixture standardCluster(BaseDirTestWatcher dirTestWatcher) { + return builder(dirTestWatcher).build(); } /** @@ -660,7 +553,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { public static String stringify(Object value) { if (value instanceof String) { - return "'" + (String) value + "'"; + return "'" + value + "'"; } else { return value.toString(); } @@ -714,34 +607,20 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } /** - * Create a temporary directory which will be removed when the - * cluster closes. - * - * @param dirName the name of the leaf directory - * @return the path to the temporary directory which is usually - * under the temporary directory structure for this machine - */ - - public File makeTempDir(final String dirName) { - File dir = getTempDir(dirName); - tempDirs.add(dir); - return dir; - } - - /** * Create a temporary data directory which will be removed when the * cluster closes, and register it as a "dfs" name space. * - * @param key the name to use for the directory and the name space. + * @param key The name to use for the directory and the name space. * Access the directory as "dfs.<key>". - * @param defaultFormat default storage format for the workspace + * @param defaultFormat The default storage format for the workspace. + * @param formatPluginConfig The format plugin config. * @return location of the directory which can be used to create * temporary input files */ - public File makeDataDir(String key, String defaultFormat) { - File dir = makeTempDir(key); - defineWorkspace("dfs", key, dir.getAbsolutePath(), defaultFormat); + public File makeDataDir(String key, String defaultFormat, FormatPluginConfig formatPluginConfig) { + File dir = builder.dirTestWatcher.makeSubDir(Paths.get(key)); + defineWorkspace("dfs", key, dir.getAbsolutePath(), defaultFormat, formatPluginConfig); return dir; }
http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java index 8295554..82bcf75 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java @@ -17,18 +17,18 @@ ******************************************************************************/ package org.apache.drill.test; -import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import com.google.common.base.Preconditions; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ZookeeperHelper; import org.apache.drill.exec.server.options.OptionDefinition; /** * Build a Drillbit and client with the options provided. The simplest - * builder starts an embedded Drillbit, with the "dfs_test" name space, + * builder starts an embedded Drillbit, with the "dfs" name space, * a max width (parallelization) of 2. */ @@ -48,9 +48,6 @@ public class ClusterFixtureBuilder { // in the defaults. public static final int DEFAULT_ZK_REFRESH = 500; // ms - public static final int DEFAULT_SERVER_RPC_THREADS = 10; - public static final int DEFAULT_SCAN_THREADS = 8; - public static final String OPTION_DEFAULTS_ROOT = "drill.exec.options."; protected ConfigBuilder configBuilder = new ConfigBuilder(); protected List<RuntimeOption> sessionOptions; @@ -60,16 +57,18 @@ public class ClusterFixtureBuilder { protected int localZkCount; protected ZookeeperHelper zkHelper; protected boolean usingZk; - protected File tempDir; - protected boolean preserveLocalFiles; protected Properties clientProps; + protected final BaseDirTestWatcher dirTestWatcher; + + public ClusterFixtureBuilder(BaseDirTestWatcher dirTestWatcher) { + this.dirTestWatcher = Preconditions.checkNotNull(dirTestWatcher); + } /** * The configuration builder which this fixture builder uses. * @return the configuration builder for use in setting "advanced" * configuration options. */ - public ConfigBuilder configBuilder() { return configBuilder; } /** @@ -87,7 +86,6 @@ public class ClusterFixtureBuilder { * @return this builder * @see {@link #configProperty(String, Object)} */ - public ClusterFixtureBuilder configResource(String configResource) { // TypeSafe gets unhappy about a leading slash, but other functions @@ -98,21 +96,18 @@ public class ClusterFixtureBuilder { return this; } - /** - * - */ public ClusterFixtureBuilder setOptionDefault(String key, Object value) { - String option_name = OPTION_DEFAULTS_ROOT + key; + String option_name = ExecConstants.OPTION_DEFAULTS_ROOT + key; configBuilder().put(option_name, value.toString()); return this; } + /** * Add an additional boot-time property for the embedded Drillbit. * @param key config property name * @param value property value * @return this builder */ - public ClusterFixtureBuilder configProperty(String key, Object value) { configBuilder.put(key, value.toString()); return this; @@ -145,9 +140,7 @@ public class ClusterFixtureBuilder { * @param key the name of the session option * @param value the value of the session option * @return this builder - * @see {@link ClusterFixture#alterSession(String, Object)} */ - public ClusterFixtureBuilder sessionOption(String key, Object value) { if (sessionOptions == null) { sessionOptions = new ArrayList<>(); @@ -163,9 +156,7 @@ public class ClusterFixtureBuilder { * @param key the name of the system option * @param value the value of the system option * @return this builder - * @see {@link ClusterFixture#alterSystem(String, Object)} */ - public ClusterFixtureBuilder systemOption(String key, Object value) { if (systemOptions == null) { systemOptions = new ArrayList<>(); @@ -252,44 +243,13 @@ public class ClusterFixtureBuilder { return this; } - public ClusterFixtureBuilder tempDir(File path) { - this.tempDir = path; - return this; - } - - /** - * Starting with the addition of the CTTAS feature, a Drillbit will - * not restart unless we delete all local storage files before - * starting the Drillbit again. In particular, the stored copies - * of the storage plugin configs cause the temporary workspace - * check to fail. Normally the cluster fixture cleans up files - * both before starting and after shutting down the cluster. Set this - * option to preserve files after shutdown, perhaps to debug the - * contents. - * <p> - * This clean-up is needed only if we enable local storage writes - * (which we must do, unfortunately, to capture and analyze - * storage profiles.) - * - * @return this builder - */ - - public ClusterFixtureBuilder keepLocalFiles() { - preserveLocalFiles = true; - return this; - } - /** * Enable saving of query profiles. The only way to save them is * to enable local store provider writes, which also saves the - * storage plugin configs. Doing so causes the CTTAS feature to - * fail on the next run, so the test fixture deletes all local - * files on start and close, unless - * {@link #keepLocalFiles()} is set. + * storage plugin configs. * * @return this builder */ - public ClusterFixtureBuilder saveProfiles() { configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true); systemOption(ExecConstants.ENABLE_QUERY_PROFILE_OPTION, true); @@ -319,7 +279,6 @@ public class ClusterFixtureBuilder { * * @return */ - public ClusterFixture build() { return new ClusterFixture(this); } http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java index 4a3823c..c85c591 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java @@ -19,9 +19,7 @@ package org.apache.drill.test; import java.io.IOException; -import org.apache.drill.TestBuilder; import org.apache.drill.common.AutoCloseables; -import org.apache.drill.test.DrillTest; import org.junit.AfterClass; /** http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java new file mode 100644 index 0000000..99bbacc --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java @@ -0,0 +1,832 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.HyperVectorValueIterator; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.HyperVectorWrapper; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.exec.util.Text; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.Assert; + +/** + * An object to encapsulate the options for a Drill unit test, as well as the execution methods to perform the tests and + * validation of results. + * + * To construct an instance easily, look at the TestBuilder class. From an implementation of + * the BaseTestQuery class, and instance of the builder is accessible through the testBuilder() method. + */ +public class DrillTestWrapper { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); + + public interface TestServices { + BufferAllocator allocator(); + + void test(String query) throws Exception; + + List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception; + } + + // TODO - when in JSON, read baseline in all text mode to avoid precision loss for decimal values + + // This flag will enable all of the values that are validated to be logged. For large validations this is time consuming + // so this is not exposed in a way that it can be enabled for an individual test. It can be changed here while debugging + // a test to see all of the output, but as this framework is doing full validation, there is no reason to keep it on as + // it will only make the test slower. + private static boolean VERBOSE_DEBUG = false; + + // Unit test doesn't expect any specific batch count + public static final int EXPECTED_BATCH_COUNT_NOT_SET = -1; + + // The motivation behind the TestBuilder was to provide a clean API for test writers. The model is mostly designed to + // prepare all of the components necessary for running the tests, before the TestWrapper is initialized. There is however + // one case where the setup for the baseline is driven by the test query results, and this is implicit type enforcement + // for the baseline data. In this case there needs to be a call back into the TestBuilder once we know the type information + // from the test query. + private TestBuilder testBuilder; + /** + * Test query to run. Type of object depends on the {@link #queryType} + */ + private Object query; + // The type of query provided + private UserBitShared.QueryType queryType; + // The type of query provided for the baseline + private UserBitShared.QueryType baselineQueryType; + // should ordering be enforced in the baseline check + private boolean ordered; + private TestServices services; + // queries to run before the baseline or test queries, can be used to set options + private String baselineOptionSettingQueries; + private String testOptionSettingQueries; + // two different methods are available for comparing ordered results, the default reads all of the records + // into giant lists of objects, like one giant on-heap batch of 'vectors' + // this flag enables the other approach which iterates through a hyper batch for the test query results and baseline + // while this does work faster and use less memory, it can be harder to debug as all of the elements are not in a + // single list + private boolean highPerformanceComparison; + // if the baseline is a single option test writers can provide the baseline values and columns + // without creating a file, these are provided to the builder in the baselineValues() and baselineColumns() methods + // and translated into a map in the builder + private String[] baselineColumns; + private List<Map<String, Object>> baselineRecords; + + private int expectedNumBatches; + + public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType, + String baselineOptionSettingQueries, String testOptionSettingQueries, + QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison, + String[] baselineColumns, List<Map<String, Object>> baselineRecords, int expectedNumBatches) { + this.testBuilder = testBuilder; + this.services = services; + this.query = query; + this.queryType = queryType; + this.baselineQueryType = baselineQueryType; + this.ordered = ordered; + this.baselineOptionSettingQueries = baselineOptionSettingQueries; + this.testOptionSettingQueries = testOptionSettingQueries; + this.highPerformanceComparison = highPerformanceComparison; + this.baselineColumns = baselineColumns; + this.baselineRecords = baselineRecords; + this.expectedNumBatches = expectedNumBatches; + } + + public void run() throws Exception { + if (testBuilder.getExpectedSchema() != null) { + compareSchemaOnly(); + } else { + if (ordered) { + compareOrderedResults(); + } else { + compareUnorderedResults(); + } + } + } + + private BufferAllocator getAllocator() { + return services.allocator(); + } + + private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords, + Map<String, HyperVectorValueIterator> actualRecords) throws Exception { + for (String s : expectedRecords.keySet()) { + assertNotNull("Expected column '" + s + "' not found.", actualRecords.get(s)); + assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords()); + HyperVectorValueIterator expectedValues = expectedRecords.get(s); + HyperVectorValueIterator actualValues = actualRecords.get(s); + int i = 0; + while (expectedValues.hasNext()) { + compareValuesErrorOnMismatch(expectedValues.next(), actualValues.next(), i, s); + i++; + } + } + cleanupHyperValueIterators(expectedRecords.values()); + cleanupHyperValueIterators(actualRecords.values()); + } + + private void cleanupHyperValueIterators(Collection<HyperVectorValueIterator> hyperBatches) { + for (HyperVectorValueIterator hvi : hyperBatches) { + for (ValueVector vv : hvi.getHyperVector().getValueVectors()) { + vv.clear(); + } + } + } + + public static void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception { + for (String s : actualRecords.keySet()) { + assertNotNull("Unexpected extra column " + s + " returned by query.", expectedRecords.get(s)); + assertEquals("Incorrect number of rows returned by query.", expectedRecords.get(s).size(), actualRecords.get(s).size()); + List<?> expectedValues = expectedRecords.get(s); + List<?> actualValues = actualRecords.get(s); + assertEquals("Different number of records returned", expectedValues.size(), actualValues.size()); + + for (int i = 0; i < expectedValues.size(); i++) { + try { + compareValuesErrorOnMismatch(expectedValues.get(i), actualValues.get(i), i, s); + } catch (Exception ex) { + throw new Exception(ex.getMessage() + "\n\n" + printNearbyRecords(expectedRecords, actualRecords, i), ex); + } + } + } + if (actualRecords.size() < expectedRecords.size()) { + throw new Exception(findMissingColumns(expectedRecords.keySet(), actualRecords.keySet())); + } + } + + private static String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) { + StringBuilder expected = new StringBuilder(); + StringBuilder actual = new StringBuilder(); + expected.append("Expected Records near verification failure:\n"); + actual.append("Actual Records near verification failure:\n"); + int firstRecordToPrint = Math.max(0, offset - 5); + List<?> expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + List<?> actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + int numberOfRecordsToPrint = Math.min(Math.min(10, expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size()); + for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) { + expected.append("Record Number: ").append(i).append(" { "); + actual.append("Record Number: ").append(i).append(" { "); + for (String s : actualRecords.keySet()) { + List<?> actualValues = actualRecords.get(s); + actual.append(s).append(" : ").append(actualValues.get(i)).append(","); + } + for (String s : expectedRecords.keySet()) { + List<?> expectedValues = expectedRecords.get(s); + expected.append(s).append(" : ").append(expectedValues.get(i)).append(","); + } + expected.append(" }\n"); + actual.append(" }\n"); + } + + return expected.append("\n\n").append(actual).toString(); + + } + + private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records, + final RecordBatchLoader loader) + throws SchemaChangeException, UnsupportedEncodingException { + // TODO - this does not handle schema changes + Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>(); + + long totalRecords = 0; + QueryDataBatch batch; + int size = records.size(); + for (int i = 0; i < size; i++) { + batch = records.get(i); + loader.load(batch.getHeader().getDef(), batch.getData()); + logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); + totalRecords += loader.getRecordCount(); + for (VectorWrapper<?> w : loader) { + String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr(); + if (!combinedVectors.containsKey(field)) { + MaterializedField mf = w.getField(); + ValueVector[] vvList = (ValueVector[]) Array.newInstance(mf.getValueClass(), 1); + vvList[0] = w.getValueVector(); + combinedVectors.put(field, new HyperVectorValueIterator(mf, new HyperVectorWrapper<>(mf, vvList))); + } else { + combinedVectors.get(field).getHyperVector().addVector(w.getValueVector()); + } + + } + } + for (HyperVectorValueIterator hvi : combinedVectors.values()) { + hvi.determineTotalSize(); + } + return combinedVectors; + } + + private static class BatchIterator implements Iterable<VectorAccessible>, AutoCloseable { + private final List<QueryDataBatch> dataBatches; + private final RecordBatchLoader batchLoader; + + public BatchIterator(List<QueryDataBatch> dataBatches, RecordBatchLoader batchLoader) { + this.dataBatches = dataBatches; + this.batchLoader = batchLoader; + } + + @Override + public Iterator<VectorAccessible> iterator() { + return new Iterator<VectorAccessible>() { + + int index = -1; + + @Override + public boolean hasNext() { + return index < dataBatches.size() - 1; + } + + @Override + public VectorAccessible next() { + index++; + if (index == dataBatches.size()) { + throw new RuntimeException("Tried to call next when iterator had no more items."); + } + batchLoader.clear(); + QueryDataBatch batch = dataBatches.get(index); + try { + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + } catch (SchemaChangeException e) { + throw new RuntimeException(e); + } + return batchLoader; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Removing is not supported"); + } + }; + } + + @Override + public void close() throws Exception { + batchLoader.clear(); + } + + } + + /** + * Iterate over batches, and combine the batches into a map, where key is schema path, and value is + * the list of column values across all the batches. + * @param batches + * @return + * @throws SchemaChangeException + * @throws UnsupportedEncodingException + */ + public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches) + throws SchemaChangeException, UnsupportedEncodingException { + Map<String, List<Object>> combinedVectors = new TreeMap<>(); + addToCombinedVectorResults(batches, null, combinedVectors); + return combinedVectors; + } + + /** + * Add to result vectors and compare batch schema against expected schema while iterating batches. + * @param batches + * @param expectedSchema: the expected schema the batches should contain. Through SchemaChangeException + * if encounter different batch schema. + * @param combinedVectors: the vectors to hold the values when iterate the batches. + * + * @return number of batches + * @throws SchemaChangeException + * @throws UnsupportedEncodingException + */ + public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors) + throws SchemaChangeException, UnsupportedEncodingException { + // TODO - this does not handle schema changes + int numBatch = 0; + long totalRecords = 0; + BatchSchema schema = null; + for (VectorAccessible loader : batches) { + numBatch++; + if (expectedSchema != null) { + if (! expectedSchema.equals(loader.getSchema())) { + throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" + + "Actual schema: %s. Expected schema : %s", + loader.getSchema(), expectedSchema)); + } + } + + // TODO: Clean: DRILL-2933: That load(...) no longer throws + // SchemaChangeException, so check/clean throws clause above. + if (schema == null) { + schema = loader.getSchema(); + for (MaterializedField mf : schema) { + combinedVectors.put(SchemaPath.getSimplePath(mf.getName()).toExpr(), new ArrayList<>()); + } + } else { + // TODO - actually handle schema changes, this is just to get access to the SelectionVectorMode + // of the current batch, the check for a null schema is used to only mutate the schema once + // need to add new vectors and null fill for previous batches? distinction between null and non-existence important? + schema = loader.getSchema(); + } + logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); + totalRecords += loader.getRecordCount(); + for (VectorWrapper<?> w : loader) { + String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr(); + ValueVector[] vectors; + if (w.isHyper()) { + vectors = w.getValueVectors(); + } else { + vectors = new ValueVector[] {w.getValueVector()}; + } + SelectionVector2 sv2 = null; + SelectionVector4 sv4 = null; + switch(schema.getSelectionVectorMode()) { + case TWO_BYTE: + sv2 = loader.getSelectionVector2(); + break; + case FOUR_BYTE: + sv4 = loader.getSelectionVector4(); + break; + } + if (sv4 != null) { + for (int j = 0; j < sv4.getCount(); j++) { + int complexIndex = sv4.get(j); + int batchIndex = complexIndex >> 16; + int recordIndexInBatch = complexIndex & 65535; + Object obj = vectors[batchIndex].getAccessor().getObject(recordIndexInBatch); + if (obj != null) { + if (obj instanceof Text) { + obj = obj.toString(); + } + } + combinedVectors.get(field).add(obj); + } + } + else { + for (ValueVector vv : vectors) { + for (int j = 0; j < loader.getRecordCount(); j++) { + int index; + if (sv2 != null) { + index = sv2.getIndex(j); + } else { + index = j; + } + Object obj = vv.getAccessor().getObject(index); + if (obj != null) { + if (obj instanceof Text) { + obj = obj.toString(); + } + } + combinedVectors.get(field).add(obj); + } + } + } + } + } + return numBatch; + } + + protected void compareSchemaOnly() throws Exception { + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + List<QueryDataBatch> actual = null; + QueryDataBatch batch = null; + try { + test(testOptionSettingQueries); + actual = testRunAndReturn(queryType, query); + batch = actual.get(0); + loader.load(batch.getHeader().getDef(), batch.getData()); + + final BatchSchema schema = loader.getSchema(); + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = testBuilder.getExpectedSchema(); + if (schema.getFieldCount() != expectedSchema.size()) { + throw new Exception("Expected and actual numbers of columns do not match."); + } + + for (int i = 0; i < schema.getFieldCount(); ++i) { + final String actualSchemaPath = schema.getColumn(i).getName(); + final TypeProtos.MajorType actualMajorType = schema.getColumn(i).getType(); + + final String expectedSchemaPath = expectedSchema.get(i).getLeft().getRootSegmentPath(); + final TypeProtos.MajorType expectedMajorType = expectedSchema.get(i).getValue(); + + if (!actualSchemaPath.equals(expectedSchemaPath) + || !actualMajorType.equals(expectedMajorType)) { + throw new Exception(String.format("Schema path or type mismatch for column #%d:\n" + + "Expected schema path: %s\nActual schema path: %s\nExpected type: %s\nActual type: %s", + i, expectedSchemaPath, actualSchemaPath, Types.toString(expectedMajorType), + Types.toString(actualMajorType))); + } + } + + } finally { + if (actual != null) { + for (QueryDataBatch b : actual) { + b.release(); + } + } + loader.clear(); + } + } + + /** + * Use this method only if necessary to validate one query against another. If you are just validating against a + * baseline file use one of the simpler interfaces that will write the validation query for you. + * + * @throws Exception + */ + protected void compareUnorderedResults() throws Exception { + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + + List<QueryDataBatch> actual = Collections.emptyList(); + List<QueryDataBatch> expected = Collections.emptyList(); + List<Map<String, Object>> expectedRecords = new ArrayList<>(); + List<Map<String, Object>> actualRecords = new ArrayList<>(); + + try { + test(testOptionSettingQueries); + actual = testRunAndReturn(queryType, query); + + checkNumBatches(actual); + + addTypeInfoIfMissing(actual.get(0), testBuilder); + addToMaterializedResults(actualRecords, actual, loader); + + // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes + // the cases where the baseline is stored in a file. + if (baselineRecords == null) { + test(baselineOptionSettingQueries); + expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); + addToMaterializedResults(expectedRecords, expected, loader); + } else { + expectedRecords = baselineRecords; + } + + compareResults(expectedRecords, actualRecords); + } finally { + cleanupBatches(actual, expected); + } + } + + /** + * Use this method only if necessary to validate one query against another. If you are just validating against a + * baseline file use one of the simpler interfaces that will write the validation query for you. + * + * @throws Exception + */ + protected void compareOrderedResults() throws Exception { + if (highPerformanceComparison) { + if (baselineQueryType == null) { + throw new Exception("Cannot do a high performance comparison without using a baseline file"); + } + compareResultsHyperVector(); + } else { + compareMergedOnHeapVectors(); + } + } + + public void compareMergedOnHeapVectors() throws Exception { + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + + List<QueryDataBatch> actual = Collections.emptyList(); + List<QueryDataBatch> expected = Collections.emptyList(); + Map<String, List<Object>> actualSuperVectors; + Map<String, List<Object>> expectedSuperVectors = null; + + try { + test(testOptionSettingQueries); + actual = testRunAndReturn(queryType, query); + + checkNumBatches(actual); + + // To avoid extra work for test writers, types can optionally be inferred from the test query + addTypeInfoIfMissing(actual.get(0), testBuilder); + + BatchIterator batchIter = new BatchIterator(actual, loader); + actualSuperVectors = addToCombinedVectorResults(batchIter); + batchIter.close(); + + // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes + // the cases where the baseline is stored in a file. + if (baselineRecords == null) { + if (baselineQueryType == null && baselineColumns != null) { + checkAscendingOrdering(actualSuperVectors); + return; + } else { + test(baselineOptionSettingQueries); + expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); + BatchIterator exBatchIter = new BatchIterator(expected, loader); + expectedSuperVectors = addToCombinedVectorResults(exBatchIter); + exBatchIter.close(); + } + } else { + // data is built in the TestBuilder in a row major format as it is provided by the user + // translate it here to vectorized, the representation expected by the ordered comparison + expectedSuperVectors = translateRecordListToHeapVectors(baselineRecords); + } + + compareMergedVectors(expectedSuperVectors, actualSuperVectors); + } catch (Exception e) { + throw new Exception(e.getMessage() + "\nFor query: " + query , e); + } finally { + cleanupBatches(expected, actual); + } + } + + private void checkAscendingOrdering(Map<String, List<Object>> results) { + int numRecords = results.get(baselineColumns[0]).size(); + + for (int index = 1; index < numRecords; index++) { + int prevIndex = index - 1; + + for (String column: baselineColumns) { + List<Object> objects = results.get(column); + Object prevObject = objects.get(prevIndex); + Object currentObject = objects.get(index); + + Assert.assertTrue(RowSetComparison.ObjectComparator.INSTANCE.compare(prevObject, currentObject) <= 0); + } + } + } + + public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) { + Map<String, List<Object>> ret = new TreeMap<>(); + for (String s : records.get(0).keySet()) { + ret.put(s, new ArrayList<>()); + } + for (Map<String, Object> m : records) { + for (String s : m.keySet()) { + ret.get(s).add(m.get(s)); + } + } + return ret; + } + + public void compareResultsHyperVector() throws Exception { + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + + test(testOptionSettingQueries); + List<QueryDataBatch> results = testRunAndReturn(queryType, query); + + checkNumBatches(results); + + // To avoid extra work for test writers, types can optionally be inferred from the test query + addTypeInfoIfMissing(results.get(0), testBuilder); + + Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader); + + test(baselineOptionSettingQueries); + List<QueryDataBatch> expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); + + Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader); + + compareHyperVectors(expectedSuperVectors, actualSuperVectors); + cleanupBatches(results, expected); + } + + private void checkNumBatches(final List<QueryDataBatch> results) { + if (expectedNumBatches != EXPECTED_BATCH_COUNT_NOT_SET) { + final int actualNumBatches = results.size(); + assertEquals(String.format("Expected %d batches but query returned %d non empty batch(es)%n", expectedNumBatches, + actualNumBatches), expectedNumBatches, actualNumBatches); + } + } + + private void addTypeInfoIfMissing(QueryDataBatch batch, TestBuilder testBuilder) { + if (! testBuilder.typeInfoSet()) { + Map<SchemaPath, TypeProtos.MajorType> typeMap = getTypeMapFromBatch(batch); + testBuilder.baselineTypes(typeMap); + } + + } + + private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryDataBatch batch) { + Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>(); + for (int i = 0; i < batch.getHeader().getDef().getFieldCount(); i++) { + typeMap.put(SchemaPath.getSimplePath(MaterializedField.create(batch.getHeader().getDef().getField(i)).getName()), + batch.getHeader().getDef().getField(i).getMajorType()); + } + return typeMap; + } + + @SafeVarargs + private final void cleanupBatches(List<QueryDataBatch>... results) { + for (List<QueryDataBatch> resultList : results ) { + for (QueryDataBatch result : resultList) { + result.release(); + } + } + } + + public static void addToMaterializedResults(List<Map<String, Object>> materializedRecords, + List<QueryDataBatch> records, + RecordBatchLoader loader) + throws SchemaChangeException, UnsupportedEncodingException { + long totalRecords = 0; + QueryDataBatch batch; + int size = records.size(); + for (int i = 0; i < size; i++) { + batch = records.get(0); + loader.load(batch.getHeader().getDef(), batch.getData()); + // TODO: Clean: DRILL-2933: That load(...) no longer throws + // SchemaChangeException, so check/clean throws clause above. + logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); + totalRecords += loader.getRecordCount(); + for (int j = 0; j < loader.getRecordCount(); j++) { + Map<String, Object> record = new TreeMap<>(); + for (VectorWrapper<?> w : loader) { + Object obj = w.getValueVector().getAccessor().getObject(j); + if (obj != null) { + if (obj instanceof Text) { + obj = obj.toString(); + } + record.put(SchemaPath.getSimplePath(w.getField().getName()).toExpr(), obj); + } + record.put(SchemaPath.getSimplePath(w.getField().getName()).toExpr(), obj); + } + materializedRecords.add(record); + } + records.remove(0); + batch.release(); + loader.clear(); + } + } + + public static boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception { + + if (compareValues(expected, actual, counter, column)) { + return true; + } + if (expected == null) { + throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: null " + + "but received " + actual + "(" + actual.getClass().getSimpleName() + ")"); + } + if (actual == null) { + throw new Exception("unexpected null at position " + counter + " column '" + column + "' should have been: " + expected); + } + if (actual instanceof byte[]) { + throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: " + + new String((byte[])expected, "UTF-8") + " but received " + new String((byte[])actual, "UTF-8")); + } + if (!expected.equals(actual)) { + throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: " + + expected + "(" + expected.getClass().getSimpleName() + ") but received " + actual + "(" + actual.getClass().getSimpleName() + ")"); + } + return true; + } + + public static boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception { + if (expected == null) { + if (actual == null) { + if (VERBOSE_DEBUG) { + logger.debug("(1) at position " + counter + " column '" + column + "' matched value: " + expected ); + } + return true; + } else { + return false; + } + } + if (actual == null) { + return false; + } + if (actual instanceof byte[]) { + if (!Arrays.equals((byte[]) expected, (byte[]) actual)) { + return false; + } else { + if (VERBOSE_DEBUG) { + logger.debug("at position " + counter + " column '" + column + "' matched value " + new String((byte[])expected, "UTF-8")); + } + return true; + } + } + if (!expected.equals(actual)) { + return false; + } else { + if (VERBOSE_DEBUG) { + logger.debug("at position " + counter + " column '" + column + "' matched value: " + expected ); + } + } + return true; + } + + /** + * Compare two result sets, ignoring ordering. + * + * @param expectedRecords - list of records from baseline + * @param actualRecords - list of records from test query, WARNING - this list is destroyed in this method + * @throws Exception + */ + private void compareResults(List<Map<String, Object>> expectedRecords, List<Map<String, Object>> actualRecords) throws Exception { + assertEquals("Different number of records returned", expectedRecords.size(), actualRecords.size()); + + int i = 0; + int counter = 0; + boolean found; + for (Map<String, Object> expectedRecord : expectedRecords) { + i = 0; + found = false; + findMatch: + for (Map<String, Object> actualRecord : actualRecords) { + for (String s : actualRecord.keySet()) { + if (!expectedRecord.containsKey(s)) { + throw new Exception("Unexpected column '" + s + "' returned by query."); + } + if (!compareValues(expectedRecord.get(s), actualRecord.get(s), counter, s)) { + i++; + continue findMatch; + } + } + if (actualRecord.size() < expectedRecord.size()) { + throw new Exception(findMissingColumns(expectedRecord.keySet(), actualRecord.keySet())); + } + found = true; + break; + } + if (!found) { + StringBuilder sb = new StringBuilder(); + for (int expectedRecordDisplayCount = 0; + expectedRecordDisplayCount < 10 && expectedRecordDisplayCount < expectedRecords.size(); + expectedRecordDisplayCount++) { + sb.append(printRecord(expectedRecords.get(expectedRecordDisplayCount))); + } + String expectedRecordExamples = sb.toString(); + sb.setLength(0); + for (int actualRecordDisplayCount = 0; + actualRecordDisplayCount < 10 && actualRecordDisplayCount < actualRecords.size(); + actualRecordDisplayCount++) { + sb.append(printRecord(actualRecords.get(actualRecordDisplayCount))); + } + String actualRecordExamples = sb.toString(); + throw new Exception(String.format("After matching %d records, did not find expected record in result set:\n %s\n\n" + + "Some examples of expected records:\n%s\n\n Some examples of records returned by the test query:\n%s", + counter, printRecord(expectedRecord), expectedRecordExamples, actualRecordExamples)); + } else { + actualRecords.remove(i); + counter++; + } + } + assertEquals(0, actualRecords.size()); + } + + private static String findMissingColumns(Set<String> expected, Set<String> actual) { + String missingCols = ""; + for (String colName : expected) { + if (!actual.contains(colName)) { + missingCols += colName + ", "; + } + } + return "Expected column(s) " + missingCols + " not found in result set: " + actual + "."; + } + + private String printRecord(Map<String, ?> record) { + String ret = ""; + for (String s : record.keySet()) { + ret += s + " : " + record.get(s) + ", "; + } + return ret + "\n"; + } + + private void test(String query) throws Exception { + services.test(query); + } + + private List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception { + return services.testRunAndReturn(type, query); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java index 67ae4a3..8366b7a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java @@ -24,12 +24,21 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.test.LogFixture.LogFixtureBuilder; import org.apache.drill.test.QueryBuilder.QuerySummary; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.apache.drill.test.rowSet.file.JsonFileBuilder; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import ch.qos.logback.classic.Level; @@ -50,7 +59,6 @@ import ch.qos.logback.classic.Level; * you can create your cluster fixture in a JUnit <tt>{@literal @}Before</tt> * method, and shut it down in <tt>{@literal @}After</tt> method. * <p> - * See {@link org.apache.drill.test.package_info the package overview} for details. */ // Note: Test itself is ignored because this is an example, not a @@ -60,6 +68,15 @@ import ch.qos.logback.classic.Level; public class ExampleTest { /** + * This test watcher creates all the temp directories that are required for an integration test with a Drillbit. The + * {@link ClusterFixture} and {@link BaseTestQuery} classes automatically configure their Drillbits to use the temp + * directories created by this test watcher. Please see {@link BaseDirTestWatcher} and package-info.java. Please see + * {@link #secondTest()} for an example. + */ + @Rule + public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher(); + + /** * Example of the simplest possible test case: set up a default * cluster (with one Drillbit), a corresponding client, run a * query and print the results. @@ -69,38 +86,61 @@ public class ExampleTest { @Test public void firstTest() throws Exception { - try (ClusterFixture cluster = ClusterFixture.standardCluster(); + try (ClusterFixture cluster = ClusterFixture.standardCluster(dirTestWatcher); ClientFixture client = cluster.clientFixture()) { client.queryBuilder().sql("SELECT * FROM `cp`.`employee.json` LIMIT 10").printCsv(); } } /** - * Example that uses the fixture builder to build a cluster fixture. Lets - * you set configuration (boot-time) options, session options, system options - * and more. * <p> - * Also shows how to display the plan JSON and just run a query silently, - * getting just the row count, batch count and run time. - * + * Example that uses the fixture builder to build a cluster fixture. Lets + * you set configuration (boot-time) options, session options, system options + * and more. + * </p> + * <p> + * You can write test files to the {@link BaseDirTestWatcher#getRootDir()} and query them in the test. + * </p> + * <p> + * Also shows how to display the plan JSON and just run a query silently, + * getting just the row count, batch count and run time. + * </p> * @throws Exception if anything goes wrong */ @Test public void secondTest() throws Exception { - ClusterFixtureBuilder builder = ClusterFixture.builder() - .configProperty(ExecConstants.SLICE_TARGET, 10) - ; + try (RootAllocator allocator = new RootAllocator(100_000_000)) { + final File tableFile = dirTestWatcher + .getRootDir() + .toPath() + .resolve("employee.json") + .toFile(); - try (ClusterFixture cluster = builder.build(); - ClientFixture client = cluster.clientFixture()) { - String sql = "SELECT * FROM `cp`.`employee.json` LIMIT 10"; - System.out.println( client.queryBuilder().sql(sql).explainJson() ); - QuerySummary results = client.queryBuilder().sql(sql).run(); - System.out.println(String.format("Read %d rows", results.recordCount())); - // Usually we want to test something. Here, just test that we got - // the 10 records. - assertEquals(10, results.recordCount()); + final BatchSchema schema = new SchemaBuilder() + .add("id", Types.required(TypeProtos.MinorType.VARCHAR)) + .add("name", Types.required(TypeProtos.MinorType.VARCHAR)) + .build(); + + final RowSet rowSet = new RowSetBuilder(allocator, schema) + .add("1", "kiwi") + .add("2", "watermelon") + .build(); + + new JsonFileBuilder(rowSet).build(tableFile); + rowSet.clear(); + + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher).configProperty(ExecConstants.SLICE_TARGET, 10); + + try (ClusterFixture cluster = builder.build(); ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT * FROM `dfs`.`test/employee.json`"; + System.out.println(client.queryBuilder().sql(sql).explainJson()); + QuerySummary results = client.queryBuilder().sql(sql).run(); + System.out.println(String.format("Read %d rows", results.recordCount())); + // Usually we want to test something. Here, just test that we got + // the 2 records. + assertEquals(2, results.recordCount()); + } } } @@ -125,7 +165,7 @@ public class ExampleTest { @Test public void thirdTest() throws Exception { - try (ClusterFixture cluster = ClusterFixture.standardCluster(); + try (ClusterFixture cluster = ClusterFixture.standardCluster(dirTestWatcher); ClientFixture client = cluster.clientFixture()) { String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_5`"; client.queryBuilder().sql(sql).printCsv(); @@ -157,16 +197,15 @@ public class ExampleTest { // All debug messages in the xsort package .logger("org.apache.drill.exec.physical.impl.xsort", Level.DEBUG) // And trace messages for one class. - .logger(ExternalSortBatch.class, Level.TRACE) - ; - ClusterFixtureBuilder builder = ClusterFixture.builder() + .logger(ExternalSortBatch.class, Level.TRACE); + + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) // Easy way to run single threaded for easy debugging .maxParallelization(1) // Set some session options .sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2L * 1024 * 1024 * 1024) .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true) - .sessionOption(PlannerSettings.HASHAGG.getOptionName(), false) - ; + .sessionOption(PlannerSettings.HASHAGG.getOptionName(), false); try (LogFixture logs = logBuilder.build(); ClusterFixture cluster = builder.build(); @@ -200,13 +239,11 @@ public class ExampleTest { * * @throws Exception if anything goes wrong */ - @Test public void fifthTest() throws Exception { - ClusterFixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) .maxParallelization(1) - .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true) - ; + .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true); try (ClusterFixture cluster = builder.build(); ClientFixture client = cluster.clientFixture()) { @@ -231,7 +268,6 @@ public class ExampleTest { * * @param args not used */ - public static void main(String args) { try { new ExampleTest().firstTest(); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java b/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java new file mode 100644 index 0000000..ec6c91e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/HadoopUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; + +public class HadoopUtils { + public static final String LOCAL_FS_SCHEME = "file://"; + + public static org.apache.hadoop.fs.Path javaToHadoopPath(java.nio.file.Path javaPath) { + return new org.apache.hadoop.fs.Path(javaPath.toUri()); + } + + public static java.nio.file.Path hadoopToJavaPath(org.apache.hadoop.fs.Path hadoopPath) { + final String pathString = hadoopPath.toUri().getPath(); + final URI uri; + + try { + uri = new URI(LOCAL_FS_SCHEME + pathString); + } catch (URISyntaxException e) { + // This should never happen + throw new RuntimeException(e); + } + + return Paths.get(uri); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index e7bf61f..58f888d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.drill.PlanTestBase; -import org.apache.drill.QueryTestUtil; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java new file mode 100644 index 0000000..db3e2ba --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.drill.test.BaseTestQuery.SilentListener; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.client.PrintingResultsListener; +import org.apache.drill.exec.client.QuerySubmitter.Format; +import org.apache.drill.exec.compile.ClassTransformer; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener; +import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionValue; +import org.apache.drill.exec.server.options.SystemOptionManager; +import org.apache.drill.exec.util.VectorUtil; + +/** + * Utilities useful for tests that issue SQL queries. + */ +public class QueryTestUtil { + + public static final String TEST_QUERY_PRINTING_SILENT = "drill.test.query.printing.silent"; + + /** + * Constructor. All methods are static. + */ + private QueryTestUtil() { + } + + /** + * Create a DrillClient that can be used to query a drill cluster. + * + * @param drillConfig + * @param remoteServiceSet remote service set + * @param maxWidth maximum width per node + * @param props Connection properties contains properties such as "user", "password", "schema" etc + * @return the newly created client + * @throws RpcException if there is a problem setting up the client + */ + public static DrillClient createClient(final DrillConfig drillConfig, final RemoteServiceSet remoteServiceSet, + final int maxWidth, final Properties props) throws RpcException, OutOfMemoryException { + final DrillClient drillClient = new DrillClient(drillConfig, remoteServiceSet.getCoordinator()); + drillClient.connect(props); + + final List<QueryDataBatch> results = drillClient.runQuery( + QueryType.SQL, String.format("alter session set `%s` = %d", + ExecConstants.MAX_WIDTH_PER_NODE_KEY, maxWidth)); + for (QueryDataBatch queryDataBatch : results) { + queryDataBatch.release(); + } + + return drillClient; + } + + /** + * Normalize the query relative to the test environment. + * + * <p>Looks for "${WORKING_PATH}" in the query string, and replaces it the current + * working patch obtained from {@link TestTools#WORKING_PATH}. + * + * @param query the query string + * @return the normalized query string + */ + public static String normalizeQuery(final String query) { + if (query.contains("${WORKING_PATH}")) { + return query.replaceAll(Pattern.quote("${WORKING_PATH}"), Matcher.quoteReplacement(TestTools.WORKING_PATH.toString())); + } else if (query.contains("[WORKING_PATH]")) { + return query.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.WORKING_PATH.toString())); + } + return query; + } + + /** + * Execute a SQL query, and print the results. + * + * @param drillClient drill client to use + * @param type type of the query + * @param queryString query string + * @return number of rows returned + * @throws Exception + */ + public static int testRunAndPrint( + final DrillClient drillClient, final QueryType type, final String queryString) throws Exception { + final String query = normalizeQuery(queryString); + DrillConfig config = drillClient.getConfig(); + AwaitableUserResultsListener resultListener = + new AwaitableUserResultsListener( + config.getBoolean(TEST_QUERY_PRINTING_SILENT) ? + new SilentListener() : + new PrintingResultsListener(config, Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH) + ); + drillClient.runQuery(type, query, resultListener); + return resultListener.await(); + } + + /** + * Execute one or more queries separated by semicolons, and print the results. + * + * @param drillClient drill client to use + * @param queryString the query string + * @throws Exception + */ + public static void test(final DrillClient drillClient, final String queryString) throws Exception{ + final String query = normalizeQuery(queryString); + String[] queries = query.split(";"); + for (String q : queries) { + final String trimmedQuery = q.trim(); + if (trimmedQuery.isEmpty()) { + continue; + } + testRunAndPrint(drillClient, QueryType.SQL, trimmedQuery); + } + } + + /** + * Execute one or more queries separated by semicolons, and print the results, with the option to + * add formatted arguments to the query string. + * + * @param drillClient drill client to use + * @param query the query string; may contain formatting specifications to be used by + * {@link String#format(String, Object...)}. + * @param args optional args to use in the formatting call for the query string + * @throws Exception + */ + public static void test(final DrillClient drillClient, final String query, Object... args) throws Exception { + test(drillClient, String.format(query, args)); + } + + /** + * Execute a single query with a user supplied result listener. + * + * @param drillClient drill client to use + * @param type type of query + * @param queryString the query string + * @param resultListener the result listener + */ + public static void testWithListener(final DrillClient drillClient, final QueryType type, + final String queryString, final UserResultsListener resultListener) { + final String query = QueryTestUtil.normalizeQuery(queryString); + drillClient.runQuery(type, query, resultListener); + } + + /** + * Set up the options to test the scalar replacement retry option (see + * ClassTransformer.java). Scalar replacement rewrites bytecode to replace + * value holders (essentially boxed values) with their member variables as + * locals. There is still one pattern that doesn't work, and occasionally new + * ones are introduced. This can be used in tests that exercise failing patterns. + * + * <p>This also flushes the compiled code cache. + * + * @param drillbit the drillbit + * @param srOption the scalar replacement option value to use + * @return the original scalar replacement option setting (so it can be restored) + */ + @SuppressWarnings("resource") + public static OptionValue setupScalarReplacementOption( + final Drillbit drillbit, final ClassTransformer.ScalarReplacementOption srOption) { + // set the system option + final DrillbitContext drillbitContext = drillbit.getContext(); + final SystemOptionManager optionManager = drillbitContext.getOptionManager(); + final OptionValue originalOptionValue = optionManager.getOption(ClassTransformer.SCALAR_REPLACEMENT_OPTION); + optionManager.setLocalOption(ClassTransformer.SCALAR_REPLACEMENT_OPTION, srOption.name().toLowerCase()); + + // flush the code cache + drillbitContext.getCompiler().flushCache(); + + return originalOptionValue; + } + + /** + * Restore the original scalar replacement option returned from + * setupScalarReplacementOption(). + * + * <p>This also flushes the compiled code cache. + * + * @param drillbit the drillbit + * @param srOption the scalar replacement option value to use + */ + public static void restoreScalarReplacementOption(final Drillbit drillbit, final String srOption) { + @SuppressWarnings("resource") + final DrillbitContext drillbitContext = drillbit.getContext(); + @SuppressWarnings("resource") + final OptionManager optionManager = drillbitContext.getOptionManager(); + optionManager.setLocalOption(ClassTransformer.SCALAR_REPLACEMENT_OPTION, srOption); + + // flush the code cache + drillbitContext.getCompiler().flushCache(); + } + +}