This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 0928427 DRILL-7418: MetadataDirectGroupScan improvements 0928427 is described below commit 09284270b85d35f7a1d521539c42876bb613a6e3 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Tue Oct 22 17:33:04 2019 +0300 DRILL-7418: MetadataDirectGroupScan improvements 1. Replaced files listing with selection root information to reduce query plan size in MetadataDirectGroupScan. 2. Fixed MetadataDirectGroupScan ser / de issues. 3. Added PlanMatcher to QueryBuilder for more convenient plan matching. 4. Re-written TestConvertCountToDirectScan to use ClusterTest. 5. Refactoring and code clean up. --- .../exec/TestHiveDrillNativeParquetReader.java | 19 +- .../exec/physical/base/AbstractGroupScan.java | 5 + .../apache/drill/exec/physical/base/GroupScan.java | 9 +- .../apache/drill/exec/physical/base/ScanStats.java | 46 ++- .../logical/ConvertCountToDirectScanRule.java | 27 +- .../physical/ConvertCountToDirectScanPrule.java | 17 +- .../drill/exec/store/direct/DirectGroupScan.java | 24 +- .../exec/store/direct/MetadataDirectGroupScan.java | 49 ++- .../parquet/AbstractParquetScanBatchCreator.java | 6 +- .../drill/TestFunctionsWithTypeExpoQueries.java | 2 +- .../logical/TestConvertCountToDirectScan.java | 444 ++++++++++++--------- .../java/org/apache/drill/test/ClientFixture.java | 76 ++-- .../java/org/apache/drill/test/QueryBuilder.java | 152 +++++-- 13 files changed, 530 insertions(+), 346 deletions(-) diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java index 5490640..6b9a7cd 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java @@ -64,8 +64,7 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { int actualRowCount = testSql(query); assertEquals("Expected and actual row count should match", 2, actualRowCount); - testPlanMatchingPatterns(query, - new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null); + testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1"); } @Test @@ -75,8 +74,7 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { int actualRowCount = testSql(query); assertEquals("Expected and actual row count should match", 1, actualRowCount); - testPlanMatchingPatterns(query, - new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null); + testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1"); } @Test @@ -114,15 +112,14 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { int actualRowCount = testSql(query); assertEquals("Expected and actual row count should match", 2, actualRowCount); - testPlanMatchingPatterns(query, - new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null); + testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1"); } @Test public void testPartitionedExternalTable() throws Exception { String query = "select * from hive.kv_native_ext"; - testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", "numFiles=2"}, null); + testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=2"); testBuilder() .sqlQuery(query) @@ -185,14 +182,16 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { int actualRowCount = testSql(query); assertEquals("Expected and actual row count should match", 2, actualRowCount); - testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null); + testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1"); } @Test public void testConvertCountToDirectScanOptimization() throws Exception { String query = "select count(1) as cnt from hive.kv_native"; - testPlanMatchingPatterns(query, new String[]{"DynamicPojoRecordReader"}, null); + testPlanMatchingPatterns(query, "DynamicPojoRecordReader"); + + testPhysicalPlanExecutionBasedOnQuery(query); testBuilder() .sqlQuery(query) @@ -224,7 +223,7 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { public void testReadAllSupportedHiveDataTypesNativeParquet() throws Exception { String query = "select * from hive.readtest_parquet"; - testPlanMatchingPatterns(query, new String[] {"HiveDrillNativeParquetScan"}, null); + testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan"); testBuilder() .sqlQuery(query) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 4916370..bc21550 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -174,6 +174,11 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca } @Override + public Path getSelectionRoot() { + return null; + } + + @Override public Collection<Path> getFiles() { return null; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 1ba1dd9..ebbb717 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -141,8 +141,15 @@ public interface GroupScan extends Scan, HasAffinity { boolean hasFiles(); /** + * Returns path to the selection root. If this GroupScan cannot provide selection root, it returns null. + * + * @return path to the selection root + */ + Path getSelectionRoot(); + + /** * Returns a collection of file names associated with this GroupScan. This should be called after checking - * hasFiles(). If this GroupScan cannot provide file names, it returns null. + * hasFiles(). If this GroupScan cannot provide file names, it returns null. * * @return collection of files paths */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java index 721f723..596dc5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java @@ -17,22 +17,44 @@ */ package org.apache.drill.exec.physical.base; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + public class ScanStats { public static final ScanStats TRIVIAL_TABLE = new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1); public static final ScanStats ZERO_RECORD_TABLE = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 0, 1, 1); + @JsonProperty + private final GroupScanProperty groupScanProperty; + @JsonProperty private final double recordCount; + @JsonProperty private final double cpuCost; + @JsonProperty private final double diskCost; - private final GroupScanProperty property; - public ScanStats(GroupScanProperty property, double recordCount, double cpuCost, double diskCost) { + @JsonCreator + public ScanStats(@JsonProperty("groupScanProperty") GroupScanProperty groupScanProperty, + @JsonProperty("recordCount") double recordCount, + @JsonProperty("cpuCost") double cpuCost, + @JsonProperty("diskCost") double diskCost) { + this.groupScanProperty = groupScanProperty; this.recordCount = recordCount; this.cpuCost = cpuCost; this.diskCost = diskCost; - this.property = property; + } + + /** + * Return if GroupScan knows the exact row count in the result of getSize() call. + * By default, group scan does not know the exact row count, before it scans every rows. + * Currently, parquet group scan will return the exact row count. + * + * @return group scan property + */ + public GroupScanProperty getGroupScanProperty() { + return groupScanProperty; } public double getRecordCount() { @@ -49,20 +71,14 @@ public class ScanStats { @Override public String toString() { - return "ScanStats{" + "recordCount=" + recordCount + ", cpuCost=" + cpuCost + ", diskCost=" + diskCost + ", property=" + property + '}'; + return "ScanStats{" + + "recordCount=" + recordCount + + ", cpuCost=" + cpuCost + + ", diskCost=" + diskCost + + ", groupScanProperty=" + groupScanProperty + + '}'; } - /** - * Return if GroupScan knows the exact row count in the result of getSize() call. - * By default, groupscan does not know the exact row count, before it scans every rows. - * Currently, parquet group scan will return the exact row count. - */ - public GroupScanProperty getGroupScanProperty() { - return property; - } - - - public enum GroupScanProperty { NO_EXACT_ROW_COUNT(false, false), EXACT_ROW_COUNT(true, true); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java index fb1bd2f..28c23f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java @@ -48,14 +48,14 @@ import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.store.parquet.metadata.Metadata_V4; import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.LinkedHashMap; import java.util.Set; @@ -99,9 +99,9 @@ public class ConvertCountToDirectScanRule extends RelOptRule { RelOptHelper.some(Aggregate.class, RelOptHelper.any(TableScan.class)), "Agg_on_scan"); - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class); + private static final Logger logger = LoggerFactory.getLogger(ConvertCountToDirectScanRule.class); - protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) { + private ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) { super(rule, "ConvertCountToDirectScanRule:" + id); } @@ -153,7 +153,7 @@ public class ConvertCountToDirectScanRule extends RelOptRule { Metadata_V4.MetadataSummary metadataSummary = status.getRight(); Map<String, Long> result = collectCounts(settings, metadataSummary, agg, scan, project); - logger.trace("Calculated the following aggregate counts: ", result); + logger.trace("Calculated the following aggregate counts: {}", result); // if counts could not be determined, rule won't be applied if (result.isEmpty()) { @@ -161,17 +161,16 @@ public class ConvertCountToDirectScanRule extends RelOptRule { return; } - List<Path> fileList = - ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot())); + Path summaryFileName = Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot()); final RelDataType scanRowType = CountToDirectScanUtils.constructDataType(agg, result.keySet()); final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()), - Collections.singletonList((List<Long>) new ArrayList<>(result.values()))); + Collections.singletonList(new ArrayList<>(result.values()))); final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); - final MetadataDirectGroupScan directScan = new MetadataDirectGroupScan(reader, fileList, scanStats, true); + final MetadataDirectGroupScan directScan = new MetadataDirectGroupScan(reader, summaryFileName, 1, scanStats, true); final DrillDirectScanRel newScan = new DrillDirectScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), directScan, scanRowType); @@ -190,16 +189,16 @@ public class ConvertCountToDirectScanRule extends RelOptRule { if (!((formatConfig instanceof ParquetFormatConfig) || ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) { - return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null); + return new ImmutablePair<>(false, null); } FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin(); - DrillFileSystem fs = null; + DrillFileSystem fs; try { fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf()); } catch (IOException e) { logger.warn("Unable to create the file system object for retrieving statistics from metadata cache file ", e); - return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null); + return new ImmutablePair<>(false, null); } // check if the cacheFileRoot has been set: this is needed because after directory pruning, the @@ -215,8 +214,7 @@ public class ConvertCountToDirectScanRule extends RelOptRule { Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs, selectionRoot, false, parquetReaderConfig); - return metadataSummary != null ? new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(true, metadataSummary) : - new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null); + return metadataSummary != null ? new ImmutablePair<>(true, metadataSummary) : new ImmutablePair<>(false, null); } /** @@ -311,5 +309,4 @@ public class ConvertCountToDirectScanRule extends RelOptRule { return ImmutableMap.copyOf(result); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java index ac8f3ca..0900ba7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.planner.physical; -import java.util.List; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.Map; @@ -44,6 +43,8 @@ import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.direct.MetadataDirectGroupScan; import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p> @@ -89,9 +90,9 @@ public class ConvertCountToDirectScanPrule extends Prule { RelOptHelper.some(DrillAggregateRel.class, RelOptHelper.any(DrillScanRel.class)), "Agg_on_scan"); - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanPrule.class); + private static final Logger logger = LoggerFactory.getLogger(ConvertCountToDirectScanPrule.class); - protected ConvertCountToDirectScanPrule(RelOptRuleOperand rule, String id) { + private ConvertCountToDirectScanPrule(RelOptRuleOperand rule, String id) { super(rule, "ConvertCountToDirectScanPrule:" + id); } @@ -125,10 +126,11 @@ public class ConvertCountToDirectScanPrule extends Prule { final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()), - Collections.singletonList((List<Long>) new ArrayList<>(result.values()))); + Collections.singletonList(new ArrayList<>(result.values()))); final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); - final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats, false); + final int numFiles = oldGrpScan.hasFiles() ? oldGrpScan.getFiles().size() : -1; + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getSelectionRoot(), numFiles, scanStats, false); final DirectScanPrel newScan = DirectScanPrel.create(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), directScan, scanRowType); @@ -145,7 +147,7 @@ public class ConvertCountToDirectScanPrule extends Prule { * * For each aggregate call will determine if count can be calculated. Collects counts only for COUNT function. * For star, not null expressions and implicit columns sets count to total record number. - * For other cases obtains counts from group scan operator. Also count can not be calculated for parition columns. + * For other cases obtains counts from group scan operator. Also count can not be calculated for partition columns. * * @param agg aggregate relational expression * @param scan scan relational expression @@ -155,7 +157,7 @@ public class ConvertCountToDirectScanPrule extends Prule { private Map<String, Long> collectCounts(PlannerSettings settings, DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { final Set<String> implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); final GroupScan oldGrpScan = scan.getGroupScan(); - final long totalRecordCount = (long)oldGrpScan.getScanStats(settings).getRecordCount(); + final long totalRecordCount = (long) oldGrpScan.getScanStats(settings).getRecordCount(); final LinkedHashMap<String, Long> result = new LinkedHashMap<>(); for (int i = 0; i < agg.getAggCallList().size(); i++) { @@ -218,5 +220,4 @@ public class ConvertCountToDirectScanPrule extends Prule { return ImmutableMap.copyOf(result); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java index 67b2e5c..6c49943 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java @@ -17,10 +17,11 @@ */ package org.apache.drill.exec.store.direct; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -34,26 +35,30 @@ import java.util.List; @JsonTypeName("direct-scan") public class DirectGroupScan extends AbstractGroupScan { + @JsonProperty protected final RecordReader reader; + @JsonProperty protected final ScanStats stats; public DirectGroupScan(RecordReader reader) { this(reader, ScanStats.TRIVIAL_TABLE); } - public DirectGroupScan(RecordReader reader, ScanStats stats) { + @JsonCreator + public DirectGroupScan(@JsonProperty("reader") RecordReader reader, + @JsonProperty("stats") ScanStats stats) { super((String) null); this.reader = reader; this.stats = stats; } @Override - public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { + public void applyAssignments(List<DrillbitEndpoint> endpoints) { assert endpoints.size() == 1; } @Override - public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { + public SubScan getSpecificScan(int minorFragmentId) { assert minorFragmentId == 0; return new DirectSubScan(reader); } @@ -68,8 +73,14 @@ public class DirectGroupScan extends AbstractGroupScan { return stats; } + @JsonIgnore @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + public List<SchemaPath> getColumns() { + return super.getColumns(); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { assert children == null || children.isEmpty(); return new DirectGroupScan(reader, stats); } @@ -83,5 +94,4 @@ public class DirectGroupScan extends AbstractGroupScan { public GroupScan clone(List<SchemaPath> columns) { return this; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java index 4fea456..da63f80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.store.direct; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; @@ -25,7 +27,6 @@ import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.store.RecordReader; import org.apache.hadoop.fs.Path; -import java.util.Collection; import java.util.List; /** @@ -37,20 +38,34 @@ import java.util.List; @JsonTypeName("metadata-direct-scan") public class MetadataDirectGroupScan extends DirectGroupScan { - private final Collection<Path> files; - private boolean usedMetadataSummaryFile = false; + @JsonProperty + private final Path selectionRoot; + @JsonProperty + private final int numFiles; + @JsonProperty + private boolean usedMetadataSummaryFile; - public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files, ScanStats stats, - boolean usedMetadataSummaryFile) { + @JsonCreator + public MetadataDirectGroupScan(@JsonProperty("reader") RecordReader reader, + @JsonProperty("selectionRoot") Path selectionRoot, + @JsonProperty("numFiles") int numFiles, + @JsonProperty("stats") ScanStats stats, + @JsonProperty("usedMetadataSummaryFile") boolean usedMetadataSummaryFile) { super(reader, stats); - this.files = files; + this.selectionRoot = selectionRoot; + this.numFiles = numFiles; this.usedMetadataSummaryFile = usedMetadataSummaryFile; } @Override + public Path getSelectionRoot() { + return selectionRoot; + } + + @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { assert children == null || children.isEmpty(); - return new MetadataDirectGroupScan(reader, files, stats, usedMetadataSummaryFile); + return new MetadataDirectGroupScan(reader, selectionRoot, numFiles, stats, usedMetadataSummaryFile); } @Override @@ -61,25 +76,25 @@ public class MetadataDirectGroupScan extends DirectGroupScan { /** * <p> * Returns string representation of group scan data. - * Includes list of files if present. + * Includes selection root, number of files, if metadata summary file was used, + * such data is present. * </p> * * <p> - * Example: [files = [/tmp/0_0_0.parquet], numFiles = 1] + * Example: [selectionRoot = [/tmp/users], numFiles = 1, usedMetadataSummaryFile = false] * </p> * * @return string representation of group scan data */ @Override public String getDigest() { - if (files != null) { - StringBuilder builder = new StringBuilder(); - builder.append("files = ").append(files).append(", "); - builder.append("numFiles = ").append(files.size()).append(", "); - builder.append("usedMetadataSummaryFile = ").append(usedMetadataSummaryFile).append(", "); - return builder.append(super.getDigest()).toString(); + StringBuilder builder = new StringBuilder(); + if (selectionRoot != null) { + builder.append("selectionRoot = ").append(selectionRoot).append(", "); } - return super.getDigest(); + builder.append("numFiles = ").append(numFiles).append(", "); + builder.append("usedMetadataSummaryFile = ").append(usedMetadataSummaryFile).append(", "); + builder.append(super.getDigest()); + return builder.toString(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java index eae151e..838180d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java @@ -273,8 +273,8 @@ public abstract class AbstractParquetScanBatchCreator { * @param rowGroup create a reader for this specific row group * @param fs file system * @param footer this file's footer - * // @param readSchemaOnly - if true sets the number of rows to read to be zero - * @return the (possibly modified) input mapWithMaxColumns + * @param readSchemaOnly if true sets the number of rows to read to be zero + * @return the (possibly modified) input mapWithMaxColumns */ private Map<String, String> createReaderAndImplicitColumns(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, @@ -347,7 +347,7 @@ public abstract class AbstractParquetScanBatchCreator { /** * Helper class responsible for creating and managing DrillFileSystem. */ - protected abstract class AbstractDrillFileSystemManager { + protected static abstract class AbstractDrillFileSystemManager { protected final OperatorContext operatorContext; diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java index 207638d..0c892da 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java @@ -277,7 +277,7 @@ public class TestFunctionsWithTypeExpoQueries extends BaseTestQuery { "where concat(a, 'asdf') = 'asdf'"; // Validate the plan - final String[] expectedPlan = {"Scan.*a.parquet.*numFiles = 1"}; + final String[] expectedPlan = {"Scan.*metadata_caching.*numFiles = 1"}; final String[] excludedPlan = {"Filter"}; PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java index c35ab2d..ae33f0a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java @@ -17,364 +17,426 @@ */ package org.apache.drill.exec.planner.logical; -import org.apache.drill.PlanTestBase; import org.apache.drill.categories.PlannerTest; +import org.apache.drill.categories.UnlikelyTest; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import java.nio.file.Paths; -@Category(PlannerTest.class) -public class TestConvertCountToDirectScan extends PlanTestBase { +import static org.junit.Assert.assertEquals; + +@Category({PlannerTest.class, UnlikelyTest.class}) +public class TestConvertCountToDirectScan extends ClusterTest { @BeforeClass - public static void setupTestFiles() { + public static void setup() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); dirTestWatcher.copyResourceToRoot(Paths.get("directcount.parquet")); + startCluster(builder); } @Test - public void ensureCaseDoesNotConvertToDirectScan() throws Exception { - testPlanMatchingPatterns( - "select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey else null end) as cnt\n" + - "from dfs.`directcount.parquet`", new String[]{"CASE"}); + public void testCaseDoesNotConvertToDirectScan() throws Exception { + queryBuilder() + .sql("select " + + "count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey else null end) as cnt " + + "from dfs.`directcount.parquet`") + .planMatcher() + .include("CASE") + .match(); } @Test - public void ensureConvertSimpleCountToDirectScan() throws Exception { + public void testConvertSimpleCountToDirectScan() throws Exception { String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`"; - testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"}); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("cnt") - .baselineValues(25L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(25L) + .go(); } @Test - public void ensureConvertSimpleCountConstToDirectScan() throws Exception { + public void testConvertSimpleCountConstToDirectScan() throws Exception { String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`"; - testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"}); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("cnt") - .baselineValues(25L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(25L) + .go(); } @Test - public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception { + public void testConvertSimpleCountConstExprToDirectScan() throws Exception { String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`"; - testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"}); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("cnt") - .baselineValues(25L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(25L) + .go(); } @Test - public void ensureDoesNotConvertForDirectoryColumns() throws Exception { + public void testDoesNotConvertForDirectoryColumns() throws Exception { String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`"; - testPlanMatchingPatterns(sql, new String[]{"ParquetGroupScan"}); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("ParquetGroupScan") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("cnt") - .baselineValues(0L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(0L) + .go(); } @Test - public void ensureConvertForImplicitColumns() throws Exception { + public void testConvertForImplicitColumns() throws Exception { String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`"; - testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"}); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("cnt") - .baselineValues(25L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(25L) + .go(); } @Test public void ensureConvertForSeveralColumns() throws Exception { - test("use dfs.tmp"); + run("use dfs.tmp"); String tableName = "parquet_table_counts"; try { String newFqnColumnName = "new_fqn"; - test("alter session set `%s` = '%s'", ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName); - test("create table %s as select * from cp.`parquet/alltypes_optional.parquet`", tableName); - test("refresh table metadata %s", tableName); + client.alterSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName); + run("create table %s as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select\n" + - "count(%s) as implicit_count,\n" + - "count(*) as star_count,\n" + - "count(col_int) as int_column_count,\n" + - "count(col_vrchr) as vrchr_column_count\n" + - "from %s", newFqnColumnName, tableName); - - testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"}); + "count(%s) as implicit_count,\n" + + "count(*) as star_count,\n" + + "count(col_int) as int_column_count,\n" + + "count(col_vrchr) as vrchr_column_count\n" + + "from %s", newFqnColumnName, tableName); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("implicit_count", "star_count", "int_column_count", "vrchr_column_count") - .baselineValues(6L, 6L, 2L, 3L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("implicit_count", "star_count", "int_column_count", "vrchr_column_count") + .baselineValues(6L, 6L, 2L, 3L) + .go(); } finally { - test("alter session reset `%s`", ExecConstants.IMPLICIT_FQN_COLUMN_LABEL); - test("drop table if exists %s", tableName); + client.resetSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL); + run("drop table if exists %s", tableName); } } @Test - public void ensureCorrectCountWithMissingStatistics() throws Exception { - test("use dfs.tmp"); + public void testCorrectCountWithMissingStatistics() throws Exception { + run("use dfs.tmp"); String tableName = "wide_str_table"; try { // table will contain two partitions: one - with null value, second - with non null value - test("create table %s partition by (col_str) as select * from cp.`parquet/wide_string.parquet`", tableName); + run("create table %s partition by (col_str) as select * from cp.`parquet/wide_string.parquet`", tableName); - String query = String.format("select count(col_str) as cnt_str, count(*) as cnt_total from %s", tableName); + String sql = String.format("select count(col_str) as cnt_str, count(*) as cnt_total from %s", tableName); // direct scan should not be applied since we don't have statistics - testPlanMatchingPatterns(query, null, new String[]{"DynamicPojoRecordReader"}); + queryBuilder() + .sql(sql) + .planMatcher() + .exclude("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(query) + .sqlQuery(sql) .unOrdered() .baselineColumns("cnt_str", "cnt_total") .baselineValues(1L, 2L) .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } @Test public void testCountsWithMetadataCacheSummary() throws Exception { - test("use dfs.tmp"); + run("use dfs.tmp"); + String tableName = "parquet_table_counts"; try { - test(String.format("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); + run("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); - test("refresh table metadata %s", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select\n" + - "count(*) as star_count,\n" + - "count(col_int) as int_column_count,\n" + - "count(col_vrchr) as vrchr_column_count\n" + - "from %s", tableName); - - int expectedNumFiles = 1; - String numFilesPattern = "numFiles = " + expectedNumFiles; - String usedMetaSummaryPattern = "usedMetadataSummaryFile = true"; - String recordReaderPattern = "DynamicPojoRecordReader"; - - testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern}); + "count(*) as star_count,\n" + + "count(col_int) as int_column_count,\n" + + "count(col_vrchr) as vrchr_column_count\n" + + "from %s", tableName); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("numFiles = 1") + .include("usedMetadataSummaryFile = true") + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("star_count", "int_column_count", "vrchr_column_count") - .baselineValues(24L, 8L, 12L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("star_count", "int_column_count", "vrchr_column_count") + .baselineValues(24L, 8L, 12L) + .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } @Test public void testCountsWithMetadataCacheSummaryAndDirPruning() throws Exception { - test("use dfs.tmp"); + run("use dfs.tmp"); String tableName = "parquet_table_counts"; try { - test(String.format("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); + run("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); - test("refresh table metadata %s", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select\n" + - "count(*) as star_count,\n" + - "count(col_int) as int_column_count,\n" + - "count(col_vrchr) as vrchr_column_count\n" + - "from %s where dir0 = 1 ", tableName); - - int expectedNumFiles = 1; - String numFilesPattern = "numFiles = " + expectedNumFiles; - String usedMetaSummaryPattern = "usedMetadataSummaryFile = true"; - String recordReaderPattern = "DynamicPojoRecordReader"; - - testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern}); + "count(*) as star_count,\n" + + "count(col_int) as int_column_count,\n" + + "count(col_vrchr) as vrchr_column_count\n" + + "from %s where dir0 = 1 ", tableName); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("numFiles = 1") + .include("usedMetadataSummaryFile = true") + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("star_count", "int_column_count", "vrchr_column_count") - .baselineValues(6L, 2L, 3L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("star_count", "int_column_count", "vrchr_column_count") + .baselineValues(6L, 2L, 3L) + .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } @Test public void testCountsWithWildCard() throws Exception { - test("use dfs.tmp"); + run("use dfs.tmp"); String tableName = "parquet_table_counts"; try { for (int i = 0; i < 10; i++) { - test(String.format("create table `%s/12/%s` as select * from cp.`tpch/nation.parquet`", tableName, i)); + run("create table `%s/12/%s` as select * from cp.`tpch/nation.parquet`", tableName, i); } - test(String.format("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName)); - test(String.format("create table `%s/2/11` as select * from cp.`tpch/nation.parquet`", tableName)); - test(String.format("create table `%s/2/12` as select * from cp.`tpch/nation.parquet`", tableName)); + run("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName); + run("create table `%s/2/11` as select * from cp.`tpch/nation.parquet`", tableName); + run("create table `%s/2/12` as select * from cp.`tpch/nation.parquet`", tableName); - test("refresh table metadata %s", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select\n" + - "count(*) as star_count\n" + - "from `%s/1*`", tableName); - - String usedMetaSummaryPattern = "usedMetadataSummaryFile = false"; - String recordReaderPattern = "DynamicPojoRecordReader"; + "count(*) as star_count\n" + + "from `%s/1*`", tableName); - testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern, recordReaderPattern}); + queryBuilder() + .sql(sql) + .planMatcher() + .include("usedMetadataSummaryFile = false") + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("star_count") - .baselineValues(250L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("star_count") + .baselineValues(250L) + .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } @Test public void testCountsForLeafDirectories() throws Exception { - test("use dfs.tmp"); + run("use dfs.tmp"); String tableName = "parquet_table_counts"; try { - test("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName); - test("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName); - test("create table `%s/3` as select * from cp.`tpch/nation.parquet`", tableName); - test("refresh table metadata %s", tableName); + run("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName); + run("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName); + run("create table `%s/3` as select * from cp.`tpch/nation.parquet`", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select\n" + - "count(*) as star_count\n" + - "from `%s/1`", tableName); - - int expectedNumFiles = 1; - String numFilesPattern = "numFiles = " + expectedNumFiles; - String usedMetaSummaryPattern = "usedMetadataSummaryFile = true"; - String recordReaderPattern = "DynamicPojoRecordReader"; + "count(*) as star_count\n" + + "from `%s/1`", tableName); - testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern}); + queryBuilder() + .sql(sql) + .planMatcher() + .include("numFiles = 1") + .include("usedMetadataSummaryFile = true") + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("star_count") - .baselineValues(25L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("star_count") + .baselineValues(25L) + .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } @Test public void testCountsForDirWithFilesAndDir() throws Exception { - test("use dfs.tmp"); + run("use dfs.tmp"); String tableName = "parquet_table_counts"; try { - test("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName); - test("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`", tableName); - test("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`", tableName); - test("refresh table metadata %s", tableName); + run("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName); + run("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`", tableName); + run("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select count(*) as star_count from `%s/1`", tableName); - int expectedNumFiles = 1; - String numFilesPattern = "numFiles = " + expectedNumFiles; - String usedMetaSummaryPattern = "usedMetadataSummaryFile = true"; - String recordReaderPattern = "DynamicPojoRecordReader"; - - testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern}); + queryBuilder() + .sql(sql) + .planMatcher() + .include("numFiles = 1") + .include("usedMetadataSummaryFile = true") + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("star_count") - .baselineValues(75L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("star_count") + .baselineValues(75L) + .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } @Test - public void testCountsWithNonExColumn() throws Exception { - test("use dfs.tmp"); + public void testCountsWithNonExistingColumn() throws Exception { + run("use dfs.tmp"); String tableName = "parquet_table_counts_nonex"; try { - test(String.format("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); - test(String.format("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName)); + run("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + run("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName); - test("refresh table metadata %s", tableName); + run("refresh table metadata %s", tableName); String sql = String.format("select\n" + - "count(*) as star_count,\n" + - "count(col_int) as int_column_count,\n" + - "count(col_vrchr) as vrchr_column_count,\n" + - "count(non_existent) as non_existent\n" + - "from %s", tableName); - - String usedMetaSummaryPattern = "usedMetadataSummaryFile = true"; - String recordReaderPattern = "DynamicPojoRecordReader"; - - testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern, recordReaderPattern}); + "count(*) as star_count,\n" + + "count(col_int) as int_column_count,\n" + + "count(col_vrchr) as vrchr_column_count,\n" + + "count(non_existent) as non_existent\n" + + "from %s", tableName); + + queryBuilder() + .sql(sql) + .planMatcher() + .include("numFiles = 1") + .include("usedMetadataSummaryFile = true") + .include("DynamicPojoRecordReader") + .match(); testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineColumns("star_count", "int_column_count", "vrchr_column_count", "non_existent" ) - .baselineValues(24L, 8L, 12L, 0L) - .go(); + .sqlQuery(sql) + .unOrdered() + .baselineColumns("star_count", "int_column_count", "vrchr_column_count", "non_existent" ) + .baselineValues(24L, 8L, 12L, 0L) + .go(); } finally { - test("drop table if exists %s", tableName); + run("drop table if exists %s", tableName); } } + + @Test + public void testSerDe() throws Exception { + String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`"; + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + assertEquals("Counts should match", 25L, cnt); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java index 17b838c..bbf684e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java @@ -19,7 +19,6 @@ package org.apache.drill.test; import java.io.BufferedReader; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.Reader; @@ -43,6 +42,8 @@ import org.apache.drill.exec.testing.ControlsInjectionUtil; import org.apache.drill.test.ClusterFixture.FixtureTestServices; import org.apache.drill.test.QueryBuilder.QuerySummary; import org.apache.drill.exec.physical.rowSet.RowSetBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents a Drill client. Provides many useful test-specific operations such @@ -52,7 +53,8 @@ import org.apache.drill.exec.physical.rowSet.RowSetBuilder; */ public class ClientFixture implements AutoCloseable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientFixture.class); + + private static final Logger logger = LoggerFactory.getLogger(ClientFixture.class); public static class ClientBuilder { @@ -119,11 +121,9 @@ public class ClientFixture implements AutoCloseable { /** * Set a runtime option. * - * @param key - * @param value - * @throws RpcException + * @param key option name + * @param value option value */ - public void alterSession(String key, Object value) { String sql = "ALTER SESSION SET `" + key + "` = " + ClusterFixture.stringify(value); runSqlSilently(sql); @@ -136,9 +136,8 @@ public class ClientFixture implements AutoCloseable { /** * Reset a system option - * @param key + * @param key option name */ - public void resetSession(String key) { runSqlSilently("ALTER SESSION RESET `" + key + "`"); } @@ -148,19 +147,19 @@ public class ClientFixture implements AutoCloseable { } /** - * Run SQL silently (discard results.) + * Run SQL silently (discard results). * - * @param sql - * @throws RpcException + * @param sql query + * @param args format params + * @throws IllegalStateException if something goes wrong */ - - public void runSqlSilently(String sql) { + public void runSqlSilently(String sql, Object... args) { try { - queryBuilder().sql(sql).run(); + queryBuilder().sql(sql, args).run(); } catch (Exception e) { // Should not fail during tests. Convert exception to unchecked // to simplify test code. - new IllegalStateException(e); + throw new IllegalStateException(e); } } @@ -182,9 +181,11 @@ public class ClientFixture implements AutoCloseable { /** * Run zero or more queries and output the results in TSV format. + * + * @param queryString query string + * @param print if query result should be printed */ - private void runQueriesAndOutput(final String queryString, - final boolean print) throws Exception { + private void runQueriesAndOutput(final String queryString, final boolean print) { final String query = QueryTestUtil.normalizeQuery(queryString); String[] queries = query.split(";"); for (String q : queries) { @@ -203,24 +204,30 @@ public class ClientFixture implements AutoCloseable { /** * Run zero or more queries and log the output in TSV format. + * + * @param queryString query string */ - public void runQueriesAndLog(final String queryString) throws Exception { + public void runQueriesAndLog(final String queryString) { runQueriesAndOutput(queryString, false); } /** * Run zero or more queries and print the output in TSV format. + * + * @param queryString query string */ - public void runQueriesAndPrint(final String queryString) throws Exception { + public void runQueriesAndPrint(final String queryString) { runQueriesAndOutput(queryString, true); } /** * Plan a query without execution. - * @throws ExecutionException - * @throws InterruptedException + * + * @param type query type + * @param query query string + * @param isSplitPlan option to tell whether to return single or split plans for a query + * @return query plan fragments */ - public QueryPlanFragments planQuery(QueryType type, String query, boolean isSplitPlan) { DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(type, query, isSplitPlan); try { @@ -251,11 +258,10 @@ public class ClientFixture implements AutoCloseable { * Return a parsed query profile for a query summary. Saving of profiles * must be turned on. * - * @param summary - * @return - * @throws IOException + * @param summary query summary + * @return profile parser + * @throws IOException if unable to parse query profile */ - public ProfileParser parseProfile(QuerySummary summary) throws IOException { return parseProfile(summary.queryIdString()); } @@ -264,9 +270,11 @@ public class ClientFixture implements AutoCloseable { * Parse a query profile from the local storage location given the * query ID. Saving of profiles must be turned on. This is a bit of * a hack: the profile should be available directly from the server. - * @throws IOException + * + * @param queryId query ID + * @return profile parser + * @throws IOException if unable to parse the profile */ - public ProfileParser parseProfile(String queryId) throws IOException { File file = new File(cluster.getProfileDir(), queryId + ".sys.drill"); return new ProfileParser(file); @@ -281,7 +289,6 @@ public class ClientFixture implements AutoCloseable { * @param controls the controls string created by * {@link org.apache.drill.exec.testing.Controls#newBuilder()} builder. */ - public void setControls(String controls) { ControlsInjectionUtil.validateControlsString(controls); alterSession(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls); @@ -309,10 +316,8 @@ public class ClientFixture implements AutoCloseable { * <li><tt>ALTER SESSION SET `foo` = ";"</tt></li> * <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li> */ - public static class StatementParser { private final Reader in; - private StringBuilder buf; public StatementParser(Reader in) { this.in = in; @@ -320,7 +325,7 @@ public class ClientFixture implements AutoCloseable { public String parseNext() throws IOException { boolean eof = false; - buf = new StringBuilder(); + StringBuilder buf = new StringBuilder(); for (;;) { int c = in.read(); if (c == -1) { @@ -377,11 +382,12 @@ public class ClientFixture implements AutoCloseable { /** * Execute a set of statements from a file. + * * @param source the set of statements, separated by semicolons * @return the number of statements executed + * @throws IOException if anable to execute statements from file */ - - public int exec(File source) throws FileNotFoundException, IOException { + public int exec(File source) throws IOException { try (Reader in = new BufferedReader(new FileReader(source))) { return exec(in); } @@ -389,10 +395,10 @@ public class ClientFixture implements AutoCloseable { /** * Execute a set of statements from a string. + * * @param stmts the set of statements, separated by semicolons * @return the number of statements executed */ - public int exec(String stmts) { try (Reader in = new StringReader(stmts)) { return exec(in); diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index 94f5d85..6895d93 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -19,17 +19,17 @@ package org.apache.drill.test; import java.io.BufferedReader; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.drill.PlanTestBase; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.common.expression.SchemaPath; @@ -61,6 +61,8 @@ import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; import org.apache.drill.test.ClientFixture.StatementParser; import org.joda.time.Period; +import static org.junit.Assert.assertEquals; + /** * Builder for a Drill query. Provides all types of query formats, * and a variety of ways to run the query. @@ -73,7 +75,7 @@ public class QueryBuilder { * using a {@link QuerySummaryFuture}. */ - public class SummaryOnlyQueryEventListener implements UserResultsListener { + public static class SummaryOnlyQueryEventListener implements UserResultsListener { /** * The future to be notified. Created here and returned by the @@ -123,7 +125,7 @@ public class QueryBuilder { * just the summary of the query. */ - public class QuerySummaryFuture implements Future<QuerySummary> { + public static class QuerySummaryFuture implements Future<QuerySummary> { /** * Synchronizes the listener thread and the test thread that @@ -153,7 +155,7 @@ public class QueryBuilder { public boolean isDone() { return summary != null; } @Override - public QuerySummary get() throws InterruptedException, ExecutionException { + public QuerySummary get() throws InterruptedException { lock.await(); return summary; } @@ -163,8 +165,7 @@ public class QueryBuilder { */ @Override - public QuerySummary get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public QuerySummary get(long timeout, TimeUnit unit) throws InterruptedException { return get(); } @@ -177,7 +178,6 @@ public class QueryBuilder { /** * Summary results of a query: records, batches, run time. */ - public static class QuerySummary { private final QueryId queryId; private final int records; @@ -261,7 +261,6 @@ public class QueryBuilder { * @param planFragments fragments that make up the plan * @return this builder */ - public QueryBuilder plan(List<PlanFragment> planFragments) { queryType = QueryType.EXECUTION; this.planFragments = planFragments; @@ -275,8 +274,7 @@ public class QueryBuilder { * optional ending semi-colon * @return this builder */ - - public QueryBuilder sql(File file) throws FileNotFoundException, IOException { + public QueryBuilder sql(File file) throws IOException { try (BufferedReader in = new BufferedReader(new FileReader(file))) { StatementParser parser = new StatementParser(in); String sql = parser.parseNext(); @@ -297,7 +295,6 @@ public class QueryBuilder { * @param resource Name of the resource * @return this builder */ - public QueryBuilder sqlResource(String resource) { sql(ClusterFixture.loadResource(resource)); return this; @@ -321,7 +318,6 @@ public class QueryBuilder { * @return the query summary * @throws Exception if anything goes wrong anywhere in the execution */ - public QuerySummary run() throws Exception { return produceSummary(withEventListener()); } @@ -330,9 +326,8 @@ public class QueryBuilder { * Run the query and return a list of the result batches. Use * if the batch count is small and you want to work with them. * @return a list of batches resulting from the query - * @throws RpcException + * @throws RpcException if anything goes wrong */ - public List<QueryDataBatch> results() throws RpcException { Preconditions.checkNotNull(queryType, "Query not provided."); Preconditions.checkNotNull(queryText, "Query not provided."); @@ -345,13 +340,12 @@ public class QueryBuilder { * by the code using a {@link RowSetReader}. * <p> * - * @see {@link #rowSetIterator()} for a version that reads a series of + * @see #rowSetIterator() for a version that reads a series of * batches as row sets. * @return a row set that represents the first non-empty batch returned from * the query * @throws RpcException if anything goes wrong */ - public DirectRowSet rowSet() throws RpcException { // Ignore all but the first non-empty batch. @@ -435,7 +429,6 @@ public class QueryBuilder { * @param <V> vector class * @param <T> return type * @return result produced by {@code reader} lambda or {@code null} if no records returned from the query - * */ @SuppressWarnings("unchecked") public <T, V> T vectorValue(String columnName, Class<V> vectorClass, VectorQueryReader<T, V> reader) @@ -543,7 +536,6 @@ public class QueryBuilder { * @return the value of the first column of the first row * @throws RpcException if anything goes wrong */ - public String singletonString() throws RpcException { return singletonGeneric(ScalarReader::getString); } @@ -554,7 +546,6 @@ public class QueryBuilder { * * @param listener the Drill listener */ - public void withListener(UserResultsListener listener) { Preconditions.checkNotNull(queryType, "Query not provided."); if (planFragments != null) { @@ -578,7 +569,6 @@ public class QueryBuilder { * * @return the query event listener */ - public BufferingQueryEventListener withEventListener() { BufferingQueryEventListener listener = new BufferingQueryEventListener(); withListener(listener); @@ -598,15 +588,12 @@ public class QueryBuilder { } /** - * <p> - * Run a query and logs the output in TSV format. - * Similar to {@link QueryTestUtil#testRunAndLog} with one query. - * </p> + * Run a query and logs the output in TSV format. + * Similar to {@link QueryTestUtil#testRunAndLog} with one query. * * @return The number of rows returned. - * @throws Exception If anything goes wrong with query execution. */ - public long log() throws Exception { + public long log() { return log(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); } @@ -623,15 +610,12 @@ public class QueryBuilder { } /** - * <p> - * Runs a query and prints the output to stdout in TSV format. - * Similar to {@link QueryTestUtil#testRunAndLog} with one query. - * </p> + * Runs a query and prints the output to stdout in TSV format. + * Similar to {@link QueryTestUtil#testRunAndLog} with one query. * * @return The number of rows returned. - * @throws Exception If anything goes wrong with query execution. */ - public long print() throws Exception { + public long print() { return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); } @@ -639,8 +623,9 @@ public class QueryBuilder { * Run the query asynchronously, returning a future to be used * to check for query completion, wait for completion, and obtain * the result summary. + * + * @return query summary future */ - public QuerySummaryFuture futureSummary() { QuerySummaryFuture future = new QuerySummaryFuture(); withListener(new SummaryOnlyQueryEventListener(future)); @@ -663,9 +648,10 @@ public class QueryBuilder { /** * Submit an "EXPLAIN" statement, and return text form of the * plan. + * + * @return explain plan * @throws Exception if the query fails */ - public String explainText() throws Exception { return explain(ClusterFixture.EXPLAIN_PLAN_TEXT); } @@ -673,9 +659,10 @@ public class QueryBuilder { /** * Submit an "EXPLAIN" statement, and return the JSON form of the * plan. + * + * @return explain plan * @throws Exception if the query fails */ - public String explainJson() throws Exception { return explain(ClusterFixture.EXPLAIN_PLAN_JSON); } @@ -685,12 +672,24 @@ public class QueryBuilder { return queryPlan(format); } + /** + * Submits explain plan statement + * and creates plan matcher instance based on return query plan. + * + * @return plan matcher + * @throws Exception if the query fails + */ + public PlanMatcher planMatcher() throws Exception { + String plan = explainText(); + return new PlanMatcher(plan); + } + private QuerySummary produceSummary(BufferingQueryEventListener listener) throws Exception { long start = System.currentTimeMillis(); int recordCount = 0; int batchCount = 0; QueryId queryId = null; - QueryState state = null; + QueryState state; loop: for (;;) { QueryEvent event = listener.get(); @@ -727,11 +726,13 @@ public class QueryBuilder { * Submit an "EXPLAIN" statement, and return the column value which * contains the plan's string. * <p> - * Cribbed from {@link PlanTestBase#getPlanInString(String, String)} - * @throws Exception if anything goes wrogn in the query + * Cribbed from PlanTestBase#getPlanInString(String, String) + * + * @param columnName column name to extract from result + * @return query plan + * @throws Exception if anything goes wrong in the query */ - - protected String queryPlan(String columnName) throws Exception { + private String queryPlan(String columnName) throws Exception { Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain an SQL query."); final List<QueryDataBatch> results = results(); final RecordBatchLoader loader = new RecordBatchLoader(client.allocator()); @@ -764,4 +765,69 @@ public class QueryBuilder { return builder.toString(); } + + /** + * Collects expected and non-expected query patterns. + * Upon {@link #match()} method call, matches given patterns to the query plan. + */ + public static class PlanMatcher { + + private static final String EXPECTED_NOT_FOUND = "Did not find expected pattern"; + private static final String UNEXPECTED_FOUND = "Found unwanted pattern"; + + private final String plan; + private final List<String> included = new ArrayList<>(); + private final List<String> excluded = new ArrayList<>(); + + public PlanMatcher(String plan) { + this.plan = plan; + } + + public PlanMatcher include(String... patterns) { + included.addAll(Arrays.asList(patterns)); + return this; + } + + public PlanMatcher exclude(String... patterns) { + excluded.addAll(Arrays.asList(patterns)); + return this; + } + + /** + * Checks if stored patterns (string parts) are included or excluded in the given plan. + * + * <p/> + * Example: <br/> + * For the plan: + * <pre> + * 00-00 Screen + * 00-01 Project(cnt=[$0]) + * 00-02 DirectScan(groupscan=[selectionRoot = classpath:/tpch/nation.parquet, + * numFiles = 1, usedMetadataSummaryFile = false, DynamicPojoRecordReader{records = [[25]]}]) + * </pre> + * + * <ul> + * <li>To check that number of files are 1 and DynamicPojoRecordReader is used: + * <code>planMatcher.include("numFiles = 1", "DynamicPojoRecordReader")</code></li> + * <li>To check that metadata summary file was not used: + * <code>planMatcher.exclude("usedMetadataSummaryFile = true")</code></li> + * </ul> + * + * Calling <code>planMatcher.match()</code> method would check that given patterns are present + * or absent in the given plan. Method execution will fail with {@link AssertionError} + * only if expected pattern was not matched or unexpected pattern was matched. + */ + public void match() { + included.forEach(pattern -> match(pattern, true)); + excluded.forEach(pattern -> match(pattern, false)); + } + + private void match(String patternString, boolean expectedResult) { + Pattern pattern = Pattern.compile(patternString); + Matcher matcher = pattern.matcher(plan); + String message = String.format("%s in plan: %s\n%s", + expectedResult ? EXPECTED_NOT_FOUND : UNEXPECTED_FOUND, patternString, plan); + assertEquals(message, expectedResult, matcher.find()); + } + } }