This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 74b277d DRILL-7313: Use Hive schema for MaprDB native reader when field was empty 74b277d is described below commit 74b277d2779064e62cf589f1dfaeb25f9bb4527f Author: Volodymyr Vysotskyi <vvo...@gmail.com> AuthorDate: Tue Jul 2 17:25:59 2019 +0300 DRILL-7313: Use Hive schema for MaprDB native reader when field was empty - Added all_text_mode option for hive maprDB Json - Improved logic to convert Hive's schema into Drill's one - Added unit tests for schema conversion --- .../exec/store/mapr/db/MapRDBFormatMatcher.java | 3 +- .../exec/store/mapr/db/MapRDBFormatPlugin.java | 14 +- .../drill/exec/store/mapr/db/MapRDBGroupScan.java | 53 ++++-- .../store/mapr/db/MapRDBPushFilterIntoScan.java | 12 +- .../store/mapr/db/MapRDBPushLimitIntoScan.java | 2 +- .../mapr/db/MapRDBRestrictedScanBatchCreator.java | 4 +- .../exec/store/mapr/db/MapRDBScanBatchCreator.java | 4 +- .../drill/exec/store/mapr/db/MapRDBSubScan.java | 31 ++-- .../store/mapr/db/RestrictedMapRDBSubScan.java | 13 +- .../store/mapr/db/binary/BinaryTableGroupScan.java | 35 ++-- .../store/mapr/db/json/JsonTableGroupScan.java | 49 +++--- .../store/mapr/db/json/MaprDBJsonRecordReader.java | 11 +- .../mapr/db/json/RestrictedJsonRecordReader.java | 6 +- .../mapr/db/json/RestrictedJsonTableGroupScan.java | 20 ++- ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java | 38 +++-- .../planner/types/HiveToRelDataTypeConverter.java | 29 ++-- .../drill/exec/store/hive/HiveUtilities.java | 100 ++++++++++++ .../store/hive/schema/TestSchemaConversion.java | 178 +++++++++++++++++++++ .../java/org/apache/drill/exec/ExecConstants.java | 7 +- .../drill/exec/planner/sql/TypeInferenceUtils.java | 33 ++++ .../exec/server/options/SystemOptionManager.java | 1 + .../drill/exec/store/dfs/easy/EasyGroupScan.java | 5 +- .../exec/vector/complex/fn/JsonReaderUtils.java | 59 +++++-- .../FileSystemMetadataProviderManager.java | 31 ++++ .../java-exec/src/main/resources/drill-module.conf | 1 + .../src/main/codegen/templates/ComplexCopier.java | 15 +- .../exec/record/metadata/RepeatedListBuilder.java | 30 ++++ .../drill/metastore/util/SchemaPathUtils.java | 2 + 28 files changed, 645 insertions(+), 141 deletions(-) diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java index f982278..73c5ffb 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java @@ -86,7 +86,8 @@ public class MapRDBFormatMatcher extends TableFormatMatcher { dt.setGroupScan(fp.getGroupScan(userName, selection, null /* columns */, - (IndexDesc) ((MapRDBIndexDescriptor) secondaryIndexDesc).getOriginalDesc())); + (IndexDesc) ((MapRDBIndexDescriptor) secondaryIndexDesc).getOriginalDesc(), + null /* metadataProviderManager */)); return dt; } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java index 6501f8c..a401094 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.store.mapr.TableFormatPlugin; import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec; import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; +import org.apache.drill.metastore.MetadataProviderManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,9 +47,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import com.mapr.db.index.IndexDesc; import com.mapr.fs.tables.TableProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MapRDBFormatPlugin extends TableFormatPlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class); + private static final Logger logger = LoggerFactory.getLogger(MapRDBFormatPlugin.class); private final MapRDBFormatMatcher matcher; private final Configuration hbaseConf; @@ -115,25 +118,24 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { MapRDBPushLimitIntoScan.LIMIT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_RKJOIN); } - public AbstractGroupScan getGroupScan(String userName, FileSelection selection, - List<SchemaPath> columns, IndexDesc indexDesc) throws IOException { + List<SchemaPath> columns, IndexDesc indexDesc, MetadataProviderManager metadataProviderManager) throws IOException { String tableName = getTableName(selection); TableProperties props = getMaprFS().getTableProperties(new Path(tableName)); if (props.getAttr().getJson()) { JsonScanSpec scanSpec = new JsonScanSpec(tableName, indexDesc, null/*condition*/); - return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns); + return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns, metadataProviderManager); } else { HBaseScanSpec scanSpec = new HBaseScanSpec(tableName); - return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns); + return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns, metadataProviderManager); } } @Override public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException { - return getGroupScan(userName, selection, columns, (IndexDesc) null /* indexDesc */); + return getGroupScan(userName, selection, columns, (IndexDesc) null /* indexDesc */, null /* metadataProviderManager */); } public boolean supportsStatistics() { diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java index 07943d9..1f0b626 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java @@ -43,10 +43,13 @@ import org.apache.drill.exec.planner.index.IndexDiscover; import org.apache.drill.exec.planner.index.IndexDiscoverFactory; import org.apache.drill.exec.planner.index.MapRDBIndexDiscover; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.AbstractStoragePlugin; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.metastore.metadata.TableMetadata; +import org.apache.drill.metastore.metadata.TableMetadataProvider; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -56,6 +59,9 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Sets; public abstract class MapRDBGroupScan extends AbstractDbGroupScan { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class); + private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size); + private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); + protected AbstractStoragePlugin storagePlugin; protected MapRDBFormatPlugin formatPlugin; @@ -74,14 +80,9 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan { private Stopwatch watch = Stopwatch.createUnstarted(); - private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() { - @Override - public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) { - return list1.size() - list2.size(); - } - }; + private TableMetadataProvider metadataProvider; - private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); + private TableMetadata tableMetadata; public MapRDBGroupScan(MapRDBGroupScan that) { super(that); @@ -96,27 +97,30 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan { * during the copy-constructor */ this.doNotAccessRegionsToScan = that.doNotAccessRegionsToScan; + this.metadataProvider = that.metadataProvider; } public MapRDBGroupScan(AbstractStoragePlugin storagePlugin, - MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) { + MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName, + TableMetadataProvider metadataProvider) { super(userName); this.storagePlugin = storagePlugin; this.formatPlugin = formatPlugin; - this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig(); + this.formatPluginConfig = formatPlugin.getConfig(); this.columns = columns; + this.metadataProvider = metadataProvider; } @Override public List<EndpointAffinity> getOperatorAffinity() { watch.reset(); watch.start(); - Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>(); + Map<String, DrillbitEndpoint> endpointMap = new HashMap<>(); for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) { endpointMap.put(ep.getAddress(), ep); } - final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>(); + final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>(); for (String serverName : getRegionsToScan().values()) { DrillbitEndpoint ep = endpointMap.get(serverName); if (ep != null) { @@ -230,9 +234,9 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan { * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more. */ while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) { - List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll(); - List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll(); - smallestList.add(largestList.remove(largestList.size()-1)); + List<MapRDBSubScanSpec> smallestList = minHeap.poll(); + List<MapRDBSubScanSpec> largestList = maxHeap.poll(); + smallestList.add(largestList.remove(largestList.size() - 1)); if (largestList.size() > minPerEndpointSlot) { maxHeap.offer(largestList); } @@ -344,4 +348,25 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan { public PluginCost getPluginCostModel() { return formatPlugin.getPluginCostModel(); } + + @JsonProperty + public TupleMetadata getSchema() { + return getTableMetadata().getSchema(); + } + + @Override + @JsonIgnore + public TableMetadataProvider getMetadataProvider() { + return metadataProvider; + } + + @Override + @JsonIgnore + public TableMetadata getTableMetadata() { + if (tableMetadata == null) { + tableMetadata = metadataProvider.getTableMetadata(); + } + return tableMetadata; + } + } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java index 6bd3368..d5afbdb 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java @@ -41,7 +41,6 @@ import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class); private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) { super(operand, description); @@ -100,7 +99,7 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul @Override public boolean matches(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(2); + final ScanPrel scan = call.rel(2); if (scan.getGroupScan() instanceof BinaryTableGroupScan || scan.getGroupScan() instanceof JsonTableGroupScan) { return super.matches(call); @@ -124,7 +123,7 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul return; } - LogicalExpression conditionExp = null; + LogicalExpression conditionExp; try { conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); } catch (ClassCastException e) { @@ -182,9 +181,10 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul } // Pass tableStats from old groupScan so we do not go and fetch stats (an expensive operation) again from MapR DB client. - final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), - groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns(), - groupScan.getTableStats()); + final BinaryTableGroupScan newGroupsScan = + new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), + groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns(), + groupScan.getTableStats(), groupScan.getMetadataProvider()); newGroupsScan.setFilterPushedDown(true); final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java index 28d59d0..325277b 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java @@ -171,7 +171,7 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule oldScanSpec.getStopRow(), oldScanSpec.getFilter()); return new BinaryTableGroupScan(binaryTableGroupScan.getUserName(), binaryTableGroupScan.getStoragePlugin(), binaryTableGroupScan.getFormatPlugin(), newScanSpec, binaryTableGroupScan.getColumns(), - binaryTableGroupScan.getTableStats()).applyLimit(offset + fetch); + binaryTableGroupScan.getTableStats(), binaryTableGroupScan.getMetadataProvider()).applyLimit(offset + fetch); } return null; } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java index cb3732a..bdea6e8 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java @@ -38,8 +38,8 @@ public class MapRDBRestrictedScanBatchCreator implements BatchCreator<Restricted List<RecordReader> readers = Lists.newArrayList(); for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ try { - readers.add(new RestrictedJsonRecordReader((RestrictedMapRDBSubScanSpec)scanSpec, subScan.getFormatPlugin(), subScan.getColumns(), - context, subScan.getMaxRecordsToRead())); + readers.add(new RestrictedJsonRecordReader(scanSpec, subScan.getFormatPlugin(), subScan.getColumns(), + context, subScan.getMaxRecordsToRead(), subScan.getSchema())); } catch (Exception e1) { throw new ExecutionSetupException(e1); } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java index 2f53398..6fe467f 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java @@ -34,7 +34,6 @@ import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class); @Override public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { @@ -48,7 +47,8 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> { getHBaseSubScanSpec(scanSpec), subScan.getColumns())); } else { - readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPlugin(), subScan.getColumns(), context, subScan.getMaxRecordsToRead())); + readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPlugin(), + subScan.getColumns(), context, subScan.getMaxRecordsToRead(), subScan.getSchema())); } } catch (Exception e) { throw new ExecutionSetupException(e); diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java index 220b90e..9b41243 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mapr.db; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -28,6 +29,8 @@ import org.apache.drill.exec.physical.base.AbstractDbSubScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.exec.store.StoragePluginRegistry; import com.fasterxml.jackson.annotation.JacksonInject; @@ -35,19 +38,18 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; // Class containing information for reading a single HBase region @JsonTypeName("maprdb-sub-scan") public class MapRDBSubScan extends AbstractDbSubScan { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class); private final MapRDBFormatPlugin formatPlugin; private final List<MapRDBSubScanSpec> regionScanSpecList; private final List<SchemaPath> columns; private final int maxRecordsToRead; private final String tableType; + private final TupleMetadata schema; @JsonCreator public MapRDBSubScan(@JacksonInject StoragePluginRegistry engineRegistry, @@ -57,34 +59,38 @@ public class MapRDBSubScan extends AbstractDbSubScan { @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("maxRecordsToRead") int maxRecordsToRead, - @JsonProperty("tableType") String tableType) throws ExecutionSetupException { + @JsonProperty("tableType") String tableType, + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata + @JsonProperty("schema") TupleSchema schema) throws ExecutionSetupException { this(userName, (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig), regionScanSpecList, columns, maxRecordsToRead, - tableType); + tableType, + schema); } public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin, - List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) { - this(userName, formatPlugin, maprSubScanSpecs, columns, -1, tableType); + List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType, TupleMetadata schema) { + this(userName, formatPlugin, maprSubScanSpecs, columns, -1, tableType, schema); } public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin, - List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, int maxRecordsToRead, String tableType) { + List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, int maxRecordsToRead, String tableType, TupleMetadata schema) { super(userName); this.formatPlugin = formatPlugin; this.regionScanSpecList = maprSubScanSpecs; this.columns = columns; this.maxRecordsToRead = maxRecordsToRead; this.tableType = tableType; + this.schema = schema; } @JsonProperty("formatPluginConfig") public MapRDBFormatPluginConfig getFormatPluginConfig() { - return (MapRDBFormatPluginConfig) formatPlugin.getConfig(); + return formatPlugin.getConfig(); } @JsonProperty("storageConfig") @@ -112,6 +118,11 @@ public class MapRDBSubScan extends AbstractDbSubScan { return tableType; } + @JsonProperty("schema") + public TupleMetadata getSchema() { + return schema; + } + @Override public boolean isExecutable() { return false; @@ -125,12 +136,12 @@ public class MapRDBSubScan extends AbstractDbSubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - return new MapRDBSubScan(getUserName(), formatPlugin, regionScanSpecList, columns, tableType); + return new MapRDBSubScan(getUserName(), formatPlugin, regionScanSpecList, columns, tableType, schema); } @Override public Iterator<PhysicalOperator> iterator() { - return ImmutableSet.<PhysicalOperator>of().iterator(); + return Collections.emptyIterator(); } @Override diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java index eedbca5..8b696c7 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java @@ -29,6 +29,8 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.impl.join.RowKeyJoin; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.exec.store.StoragePluginRegistry; /** @@ -47,15 +49,18 @@ public class RestrictedMapRDBSubScan extends MapRDBSubScan { @JsonProperty("regionScanSpecList") List<RestrictedMapRDBSubScanSpec> regionScanSpecList, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("maxRecordsToRead") int maxRecordsToRead, - @JsonProperty("tableType") String tableType) throws ExecutionSetupException { + @JsonProperty("tableType") String tableType, + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata + @JsonProperty("schema") TupleSchema schema) throws ExecutionSetupException { this(userName, (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig), - regionScanSpecList, columns, maxRecordsToRead, tableType); + regionScanSpecList, columns, maxRecordsToRead, tableType, schema); } public RestrictedMapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin, - List<RestrictedMapRDBSubScanSpec> maprDbSubScanSpecs, List<SchemaPath> columns, int maxRecordsToRead, String tableType) { - super(userName, formatPlugin, new ArrayList<MapRDBSubScanSpec>(), columns, maxRecordsToRead, tableType); + List<RestrictedMapRDBSubScanSpec> maprDbSubScanSpecs, + List<SchemaPath> columns, int maxRecordsToRead, String tableType, TupleMetadata schema) { + super(userName, formatPlugin, new ArrayList<>(), columns, maxRecordsToRead, tableType, schema); for(RestrictedMapRDBSubScanSpec restrictedSpec : maprDbSubScanSpecs) { getRegionScanSpecList().add(restrictedSpec); diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java index a135464..96f96ca 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.planner.index.Statistics; import org.apache.drill.exec.store.StoragePluginRegistry; @@ -46,6 +47,9 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScan; import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; import org.apache.drill.exec.store.mapr.db.MapRDBTableStats; import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo; +import org.apache.drill.metastore.FileSystemMetadataProviderManager; +import org.apache.drill.metastore.MetadataProviderManager; +import org.apache.drill.metastore.metadata.TableMetadataProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -60,10 +64,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @JsonTypeName("maprdb-binary-scan") public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseConstants { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class); + private static final Logger logger = LoggerFactory.getLogger(BinaryTableGroupScan.class); public static final String TABLE_BINARY = "binary"; @@ -79,24 +85,25 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC @JsonProperty("storage") FileSystemConfig storagePluginConfig, @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig, @JsonProperty("columns") List<SchemaPath> columns, - @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { - this (userName, - (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), - (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), - scanSpec, columns); + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata + @JsonProperty("schema") TupleSchema schema, + @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException, IOException { + this(userName, (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), + (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), + scanSpec, columns, null /* tableStats */, FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema)); } public BinaryTableGroupScan(String userName, AbstractStoragePlugin storagePlugin, - MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) { - super(storagePlugin, formatPlugin, columns, userName); - this.hbaseScanSpec = scanSpec; - init(); + MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns, + MetadataProviderManager metadataProviderManager) throws IOException { + this(userName, storagePlugin, formatPlugin, scanSpec, + columns, null /* tableStats */, FileSystemMetadataProviderManager.getMetadataProvider(metadataProviderManager)); } public BinaryTableGroupScan(String userName, AbstractStoragePlugin storagePlugin, - MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, - List<SchemaPath> columns, MapRDBTableStats tableStats) { - super(storagePlugin, formatPlugin, columns, userName); + MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, + List<SchemaPath> columns, MapRDBTableStats tableStats, TableMetadataProvider metadataProvider) { + super(storagePlugin, formatPlugin, columns, userName, metadataProvider); this.hbaseScanSpec = scanSpec; this.tableStats = tableStats; init(); @@ -173,7 +180,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC assert minorFragmentId < endpointFragmentMapping.size() : String.format( "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), minorFragmentId); - return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY); + return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY, getSchema()); } @Override diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java index 24f370a..414c7f3 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.physical.base.IndexGroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.planner.index.IndexDescriptor; import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor; @@ -64,6 +65,9 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; import org.apache.drill.exec.store.mapr.db.MapRDBTableStats; import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo; import org.apache.drill.exec.util.Utilities; +import org.apache.drill.metastore.FileSystemMetadataProviderManager; +import org.apache.drill.metastore.MetadataProviderManager; +import org.apache.drill.metastore.metadata.TableMetadataProvider; import org.codehaus.jackson.annotate.JsonCreator; import org.ojai.store.QueryCondition; @@ -119,26 +123,25 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca @JsonProperty("storage") FileSystemConfig storagePluginConfig, @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig, @JsonProperty("columns") List<SchemaPath> columns, - @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { - this (userName, - (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), - (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), - scanSpec, columns); + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata + @JsonProperty("schema") TupleSchema schema, + @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException, IOException { + this(userName, (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), + (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), + scanSpec, columns, new MapRDBStatistics(), FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema)); } public JsonTableGroupScan(String userName, AbstractStoragePlugin storagePlugin, - MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) { - super(storagePlugin, formatPlugin, columns, userName); - this.scanSpec = scanSpec; - this.stats = new MapRDBStatistics(); - this.forcedRowCountMap = new HashMap<>(); - init(); + MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns, + MetadataProviderManager metadataProviderManager) throws IOException { + this(userName, storagePlugin, formatPlugin, scanSpec, columns, + new MapRDBStatistics(), FileSystemMetadataProviderManager.getMetadataProvider(metadataProviderManager)); } - public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin, + public JsonTableGroupScan(String userName, AbstractStoragePlugin storagePlugin, MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns, - MapRDBStatistics stats) { - super(storagePlugin, formatPlugin, columns, userName); + MapRDBStatistics stats, TableMetadataProvider metadataProvider) { + super(storagePlugin, formatPlugin, columns, userName, metadataProvider); this.scanSpec = scanSpec; this.stats = stats; this.forcedRowCountMap = new HashMap<>(); @@ -293,7 +296,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca assert minorFragmentId < endpointFragmentMapping.size() : String.format( "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), minorFragmentId); - return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON); + return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON, getSchema()); } @Override @@ -471,15 +474,21 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca @Override public RestrictedJsonTableGroupScan getRestrictedScan(List<SchemaPath> columns) { - RestrictedJsonTableGroupScan newScan = - new RestrictedJsonTableGroupScan(this.getUserName(), + try { + RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this.getUserName(), (FileSystemPlugin) this.getStoragePlugin(), this.getFormatPlugin(), this.getScanSpec(), this.getColumns(), - this.getStatistics()); - newScan.columns = columns; - return newScan; + this.getStatistics(), + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata + (TupleSchema) this.getSchema()); + + newScan.columns = columns; + return newScan; + } catch (IOException e) { + throw new DrillRuntimeException("Error happened when constructing RestrictedJsonTableGroupScan", e); + } } /** diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 2d32e9f..5e80e62 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; @@ -122,6 +123,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private final boolean ignoreSchemaChange; private final boolean disableCountOptimization; private final boolean nonExistentColumnsProjection; + private final TupleMetadata schema; protected final MapRDBSubScanSpec subScanSpec; protected final MapRDBFormatPlugin formatPlugin; @@ -133,8 +135,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { protected Document lastDocument; public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin, - List<SchemaPath> projectedColumns, FragmentContext context, int maxRecords) { - this(subScanSpec, formatPlugin, projectedColumns, context); + List<SchemaPath> projectedColumns, FragmentContext context, int maxRecords, TupleMetadata schema) { + this(subScanSpec, formatPlugin, projectedColumns, context, schema); this.maxRecordsToRead = maxRecords; this.lastDocumentReader = null; this.lastDocument = null; @@ -142,12 +144,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin, - List<SchemaPath> projectedColumns, FragmentContext context) { + List<SchemaPath> projectedColumns, FragmentContext context, TupleMetadata schema) { buffer = context.getManagedBuffer(); final Path tablePath = new Path(Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a sub-scan spec").getTableName()); this.subScanSpec = subScanSpec; this.formatPlugin = formatPlugin; + this.schema = schema; final IndexDesc indexDesc = subScanSpec.getIndexDesc(); byte[] serializedFilter = subScanSpec.getSerializedFilter(); condition = null; @@ -445,7 +448,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } if (nonExistentColumnsProjection && recordCount > 0) { - JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.emptyList()); + JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), schema, allTextMode, Collections.emptyList()); } vectorWriter.setValueCount(recordCount); if (maxRecordsToRead > 0) { diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java index bf150c1..a382163 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.physical.impl.join.RowKeyJoin; import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec; @@ -61,9 +62,10 @@ public class RestrictedJsonRecordReader extends MaprDBJsonRecordReader { MapRDBFormatPlugin formatPlugin, List<SchemaPath> projectedColumns, FragmentContext context, - int maxRecordsToRead) { + int maxRecordsToRead, + TupleMetadata schema) { - super(subScanSpec, formatPlugin, projectedColumns, context, maxRecordsToRead); + super(subScanSpec, formatPlugin, projectedColumns, context, maxRecordsToRead, schema); batchSize = (int)context.getOptions().getOption(ExecConstants.QUERY_ROWKEYJOIN_BATCHSIZE); int idx = 0; FieldPath[] scannedFields = this.getScannedFields(); diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java index 055c5a5..e1285eb 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mapr.db.json; +import java.io.IOException; import java.util.List; import java.util.NavigableMap; @@ -24,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.metastore.FileSystemMetadataProviderManager; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -53,12 +56,15 @@ public class RestrictedJsonTableGroupScan extends JsonTableGroupScan { @JsonCreator public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName, - @JsonProperty("storage") FileSystemPlugin storagePlugin, - @JsonProperty("format") MapRDBFormatPlugin formatPlugin, - @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */ - @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("") MapRDBStatistics statistics) { - super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics); + @JsonProperty("storage") FileSystemPlugin storagePlugin, + @JsonProperty("format") MapRDBFormatPlugin formatPlugin, + @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */ + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("") MapRDBStatistics statistics, + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata + @JsonProperty("schema") TupleSchema schema) throws IOException { + super(userName, storagePlugin, formatPlugin, scanSpec, columns, + statistics, FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema)); } // TODO: this method needs to be fully implemented @@ -82,7 +88,7 @@ public class RestrictedJsonTableGroupScan extends JsonTableGroupScan { minorFragmentId); RestrictedMapRDBSubScan subscan = new RestrictedMapRDBSubScan(getUserName(), formatPlugin, - getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON); + getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON, getSchema()); return subscan; } diff --git a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java index 7aca59d..8b18347 100644 --- a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java +++ b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java @@ -21,13 +21,16 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.planner.index.MapRDBStatistics; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.hive.HiveMetadataProvider; import org.apache.drill.exec.store.hive.HiveReadEntry; @@ -37,9 +40,11 @@ import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec; import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; +import org.apache.drill.metastore.FileSystemMetadataProviderManager; import org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat; import org.ojai.DocumentConstants; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -105,7 +110,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi To ensure Drill MapR-DB Json scan will be chosen, reduce Hive scan importance to 0. */ call.getPlanner().setImportance(hiveScanRel, 0.0); - } catch (DrillRuntimeException e) { + } catch (Exception e) { // TODO: Improve error handling after allowing to throw IOException from StoragePlugin.getFormatPlugin() logger.warn("Failed to convert HiveScan to JsonScanSpec. Fallback to HiveMapR-DB connector.", e); } @@ -114,28 +119,43 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi /** * Helper method which creates a DrillScanRel with native Drill HiveScan. */ - private DrillScanRel createNativeScanRel(DrillScanRel hiveScanRel, PlannerSettings settings) { + private DrillScanRel createNativeScanRel(DrillScanRel hiveScanRel, PlannerSettings settings) throws IOException { RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory(); HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan(); - Map<String, String> parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters(); + HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry(); + Map<String, String> parameters = hiveReadEntry.getHiveTableWrapper().getParameters(); JsonScanSpec scanSpec = new JsonScanSpec(parameters.get(MAPRDB_TABLE_NAME), null, null); List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream() .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath)) .collect(Collectors.toList()); + + // creates TupleMetadata based on Hive's schema (with optional data modes) to be used in the reader + // for the case when column type wasn't discovered + HiveToRelDataTypeConverter dataTypeConverter = new HiveToRelDataTypeConverter(typeFactory); + TupleMetadata schema = new TupleSchema(); + hiveReadEntry.getTable().getColumnListsCache().getTableSchemaColumns() + .forEach(column -> schema.addColumn(HiveUtilities.getColumnMetadata(dataTypeConverter, column))); + + MapRDBFormatPluginConfig formatConfig = new MapRDBFormatPluginConfig(); + + formatConfig.readTimestampWithZoneOffset = + settings.getOptions().getBoolean(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET); + + formatConfig.allTextMode = settings.getOptions().getBoolean(ExecConstants.HIVE_MAPRDB_JSON_ALL_TEXT_MODE); + JsonTableGroupScan nativeMapRDBScan = new JsonTableGroupScan( hiveScan.getUserName(), hiveScan.getStoragePlugin(), // TODO: We should use Hive format plugins here, once it will be implemented. DRILL-6621 - (MapRDBFormatPlugin) hiveScan.getStoragePlugin().getFormatPlugin(new MapRDBFormatPluginConfig()), + (MapRDBFormatPlugin) hiveScan.getStoragePlugin().getFormatPlugin(formatConfig), scanSpec, - hiveScanCols + hiveScanCols, + new MapRDBStatistics(), + FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema) ); - nativeMapRDBScan.getFormatPlugin().getConfig().readTimestampWithZoneOffset = - settings.getOptions().getBoolean(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET); - List<String> nativeScanColNames = hiveScanRel.getRowType().getFieldList().stream() .map(field -> replaceOverriddenColumnId(parameters, field.getName())) .collect(Collectors.toList()); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java index 3b3abf2..6cb88ae 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java @@ -70,45 +70,44 @@ public class HiveToRelDataTypeConverter { */ public RelDataType convertToNullableRelDataType(FieldSchema field) { TypeInfo fieldTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType()); - RelDataType relDataType = convertToRelDataType(fieldTypeInfo); - return typeFactory.createTypeWithNullability(relDataType, true); + return convertToRelDataType(fieldTypeInfo, true); } - private RelDataType convertToRelDataType(TypeInfo typeInfo) { + private RelDataType convertToRelDataType(TypeInfo typeInfo, boolean nullable) { final Category typeCategory = typeInfo.getCategory(); switch (typeCategory) { case PRIMITIVE: - return getRelDataType((PrimitiveTypeInfo) typeInfo); + return typeFactory.createTypeWithNullability(getRelDataType((PrimitiveTypeInfo) typeInfo), nullable); case LIST: - return getRelDataType((ListTypeInfo) typeInfo); + return typeFactory.createTypeWithNullability(getRelDataType((ListTypeInfo) typeInfo, nullable), nullable); case MAP: - return getRelDataType((MapTypeInfo) typeInfo); + return typeFactory.createTypeWithNullability(getRelDataType((MapTypeInfo) typeInfo, nullable), nullable); case STRUCT: - return getRelDataType((StructTypeInfo) typeInfo); + return typeFactory.createTypeWithNullability(getRelDataType((StructTypeInfo) typeInfo, nullable), nullable); case UNION: logger.warn("There is no UNION data type in SQL. Converting it to Sql type OTHER to avoid " + "breaking INFORMATION_SCHEMA queries"); - return typeFactory.createSqlType(SqlTypeName.OTHER); + return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.OTHER), nullable); } throw new RuntimeException(String.format(UNSUPPORTED_HIVE_DATA_TYPE_ERROR_MSG, typeCategory)); } - private RelDataType getRelDataType(StructTypeInfo structTypeInfo) { + private RelDataType getRelDataType(StructTypeInfo structTypeInfo, boolean nullable) { final List<String> fieldNames = structTypeInfo.getAllStructFieldNames(); final List<RelDataType> relDataTypes = structTypeInfo.getAllStructFieldTypeInfos().stream() - .map(this::convertToRelDataType) + .map(typeInfo -> convertToRelDataType(typeInfo, nullable)) .collect(Collectors.toList()); return typeFactory.createStructType(relDataTypes, fieldNames); } - private RelDataType getRelDataType(MapTypeInfo mapTypeInfo) { - RelDataType keyType = convertToRelDataType(mapTypeInfo.getMapKeyTypeInfo()); - RelDataType valueType = convertToRelDataType(mapTypeInfo.getMapValueTypeInfo()); + private RelDataType getRelDataType(MapTypeInfo mapTypeInfo, boolean nullable) { + RelDataType keyType = convertToRelDataType(mapTypeInfo.getMapKeyTypeInfo(), nullable); + RelDataType valueType = convertToRelDataType(mapTypeInfo.getMapValueTypeInfo(), nullable); return typeFactory.createMapType(keyType, valueType); } - private RelDataType getRelDataType(ListTypeInfo listTypeInfo) { - RelDataType listElemTypeInfo = convertToRelDataType(listTypeInfo.getListElementTypeInfo()); + private RelDataType getRelDataType(ListTypeInfo listTypeInfo, boolean nullable) { + RelDataType listElemTypeInfo = convertToRelDataType(listTypeInfo.getListElementTypeInfo(), nullable); return typeFactory.createArrayType(listElemTypeInfo, -1); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java index 1b1c3e3..4920f17 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java @@ -17,6 +17,18 @@ */ package org.apache.drill.exec.store.hive; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.planner.sql.TypeInferenceUtils; +import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MapColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Strings; import io.netty.buffer.DrillBuf; @@ -758,5 +770,93 @@ public class HiveUtilities { return new HiveTableWrapper.HivePartitionWrapper(new HivePartition(partition, listIndex)); } + /** + * Converts specified {@code RelDataType relDataType} into {@link ColumnMetadata}. + * For the case when specified relDataType is struct, map with recursively converted children + * will be created. + * + * @param name filed name + * @param relDataType filed type + * @return {@link ColumnMetadata} which corresponds to specified {@code RelDataType relDataType} + */ + public static ColumnMetadata getColumnMetadata(String name, RelDataType relDataType) { + switch (relDataType.getSqlTypeName()) { + case ARRAY: + return getArrayMetadata(name, relDataType); + case MAP: + case OTHER: + throw new UnsupportedOperationException(String.format("Unsupported data type: %s", relDataType.getSqlTypeName())); + default: + if (relDataType.isStruct()) { + return getStructMetadata(name, relDataType); + } else { + return new PrimitiveColumnMetadata( + MaterializedField.create(name, + TypeInferenceUtils.getDrillMajorTypeFromCalciteType(relDataType))); + } + } + } + + /** + * Returns {@link ColumnMetadata} instance which corresponds to specified array {@code RelDataType relDataType}. + * + * @param name name of the filed + * @param relDataType the source of type information to construct the schema + * @return {@link ColumnMetadata} instance + */ + private static ColumnMetadata getArrayMetadata(String name, RelDataType relDataType) { + RelDataType componentType = relDataType.getComponentType(); + ColumnMetadata childColumnMetadata = getColumnMetadata(name, componentType); + switch (componentType.getSqlTypeName()) { + case ARRAY: + // for the case when nested type is array, it should be placed into repeated list + return MetadataUtils.newRepeatedList(name, childColumnMetadata); + case MAP: + case OTHER: + throw new UnsupportedOperationException(String.format("Unsupported data type: %s", relDataType.getSqlTypeName())); + default: + if (componentType.isStruct()) { + // for the case when nested type is struct, it should be placed into repeated map + return MetadataUtils.newMapArray(name, childColumnMetadata.mapSchema()); + } else { + // otherwise creates column metadata with repeated data mode + return new PrimitiveColumnMetadata( + MaterializedField.create(name, + Types.overrideMode( + TypeInferenceUtils.getDrillMajorTypeFromCalciteType(componentType), + DataMode.REPEATED))); + } + } + } + + /** + * Returns {@link MapColumnMetadata} column metadata created based on specified {@code RelDataType relDataType} with + * converted to {@link ColumnMetadata} {@code relDataType}'s children. + * + * @param name name of the filed + * @param relDataType {@link RelDataType} the source of the children for resulting schema + * @return {@link MapColumnMetadata} column metadata + */ + private static MapColumnMetadata getStructMetadata(String name, RelDataType relDataType) { + TupleMetadata mapSchema = new TupleSchema(); + for (RelDataTypeField relDataTypeField : relDataType.getFieldList()) { + mapSchema.addColumn(getColumnMetadata(relDataTypeField.getName(), relDataTypeField.getType())); + } + return MetadataUtils.newMap(name, mapSchema); + } + + /** + * Converts specified {@code FieldSchema column} into {@link ColumnMetadata}. + * For the case when specified relDataType is struct, map with recursively converted children + * will be created. + * + * @param dataTypeConverter converter to obtain Calcite's types from Hive's ones + * @param column column to convert + * @return {@link ColumnMetadata} which corresponds to specified {@code FieldSchema column} + */ + public static ColumnMetadata getColumnMetadata(HiveToRelDataTypeConverter dataTypeConverter, FieldSchema column) { + RelDataType relDataType = dataTypeConverter.convertToNullableRelDataType(column); + return getColumnMetadata(column.getName(), relDataType); + } } diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/schema/TestSchemaConversion.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/schema/TestSchemaConversion.java new file mode 100644 index 0000000..7154c1b --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/schema/TestSchemaConversion.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive.schema; + +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem; +import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.store.hive.HiveUtilities; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + +@Category({SlowTest.class}) +public class TestSchemaConversion { + private static final HiveToRelDataTypeConverter dataTypeConverter + = new HiveToRelDataTypeConverter(new SqlTypeFactoryImpl(new DrillRelDataTypeSystem())); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testPrimitiveSchema() { + verifyConversion("int", Types.optional(TypeProtos.MinorType.INT)); + verifyConversion("varchar(123)", TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR).setPrecision(123).build()); + verifyConversion("timestamp", Types.optional(TypeProtos.MinorType.TIMESTAMP)); + } + + @Test + public void testStructSchema() { + ColumnMetadata expectedSchema = new SchemaBuilder() + .addMap("a") + .addNullable("t1", TypeProtos.MinorType.BIT) + .addNullable("t2", TypeProtos.MinorType.INT) + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("struct<t1:boolean,t2:int>", expectedSchema); + + expectedSchema = new SchemaBuilder() + .addMap("a") + .addNullable("t1", TypeProtos.MinorType.BIT) + .addMap("t2") + .addNullable("t3", TypeProtos.MinorType.VARDECIMAL, 38, 8) + .resumeMap() + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("struct<t1:boolean,t2:struct<t3:decimal(38,8)>>", expectedSchema); + } + + @Test + public void testRepeatedSchema() { + verifyConversion("array<boolean>", Types.repeated(TypeProtos.MinorType.BIT)); + + ColumnMetadata expectedSchema = new SchemaBuilder() + .addRepeatedList("a") + .addArray(TypeProtos.MinorType.BIT) + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("array<array<boolean>>", expectedSchema); + + expectedSchema = new SchemaBuilder() + .addRepeatedList("a") + .addDimension() + .addArray(TypeProtos.MinorType.BIT) + .resumeList() + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("array<array<array<boolean>>>", expectedSchema); + } + + @Test + public void testRepeatedStructSchema() { + ColumnMetadata expectedSchema = new SchemaBuilder() + .addMapArray("a") + .addNullable("t1", TypeProtos.MinorType.VARCHAR, 999) + .addNullable("t2", TypeProtos.MinorType.INT) + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("array<struct<t1:varchar(999),t2:int>>", expectedSchema); + + expectedSchema = new SchemaBuilder() + .addRepeatedList("a") + .addMapArray() + .addNullable("t1", TypeProtos.MinorType.VARCHAR, 999) + .addNullable("t2", TypeProtos.MinorType.INT) + .resumeList() + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("array<array<struct<t1:varchar(999),t2:int>>>", expectedSchema); + + expectedSchema = new SchemaBuilder() + .addRepeatedList("a") + .addDimension() + .addMapArray() + .addRepeatedList("t1") + .addArray(TypeProtos.MinorType.VARCHAR, 999) + .resumeMap() + .addArray("t2", TypeProtos.MinorType.VARDECIMAL, 28, 14) + .resumeList() + .resumeList() + .resumeSchema() + .buildSchema() + .metadata(0); + verifyConversion("array<array<array<struct<t1:array<array<varchar(999)>>,t2:array<decimal(28,14)>>>>>", expectedSchema); + } + + @Test + public void testUnionSchema() { + thrown.expect(UnsupportedOperationException.class); + convertType("uniontype<int,double>"); + } + + @Test + public void testListUnionSchema() { + thrown.expect(UnsupportedOperationException.class); + convertType("array<uniontype<int,double>>"); + } + + @Test + public void testStructUnionSchema() { + thrown.expect(UnsupportedOperationException.class); + convertType("struct<a:uniontype<int,double>,b:int>"); + } + + @Test + public void testMapSchema() { + thrown.expect(UnsupportedOperationException.class); + convertType("map<int,varchar(23)>"); + } + + @Test + public void testRepeatedMapSchema() { + thrown.expect(UnsupportedOperationException.class); + convertType("array<map<int,varchar(23)>>"); + } + + private void verifyConversion(String hiveType, TypeProtos.MajorType type) { + assertEquals(new PrimitiveColumnMetadata("a", type).columnString(), convertType(hiveType).columnString()); + } + + private void verifyConversion(String hiveType, ColumnMetadata expectedSchema) { + assertEquals(expectedSchema.columnString(), convertType(hiveType).columnString()); + } + + private ColumnMetadata convertType(String type) { + return HiveUtilities.getColumnMetadata(dataTypeConverter, new FieldSchema("a", type, "")); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index a7bfd96..1815855 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -531,7 +531,12 @@ public final class ExecConstants { public static final String HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET = "store.hive.maprdb_json.read_timestamp_with_timezone_offset"; public static final OptionValidator HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET_VALIDATOR = new BooleanValidator(HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET, - new OptionDescription("Enables Drill to read timestamp values with timezone offset when hive plugin is used and Drill native MaprDB JSON reader usage is enabled. (Drill 1.16+)")); + new OptionDescription("Enables Drill to read timestamp values with timezone offset when Hive plugin is used and Drill native MaprDB JSON reader usage is enabled. (Drill 1.16+)")); + + public static final String HIVE_MAPRDB_JSON_ALL_TEXT_MODE = "store.hive.maprdb_json.all_text_mode"; + public static final OptionValidator HIVE_MAPRDB_JSON_ALL_TEXT_MODE_VALIDATOR = + new BooleanValidator(HIVE_MAPRDB_JSON_ALL_TEXT_MODE, + new OptionDescription("Drill reads all data from the maprDB Json tables as VARCHAR when hive plugin is used and Drill native MaprDB JSON reader usage is enabled. Prevents schema change errors. (Drill 1.17+)")); public static final String HIVE_CONF_PROPERTIES = "store.hive.conf.properties"; public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new StringValidator(HIVE_CONF_PROPERTIES, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java index 9b01424..2953314 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java @@ -226,6 +226,39 @@ public class TypeInferenceUtils { } /** + * Returns {@link TypeProtos.MajorType} instance which corresponds to specified {@code RelDataType relDataType} + * with its nullability, scale and precision if it is available. + * + * @param relDataType RelDataType to convert + * @return {@link TypeProtos.MajorType} instance + */ + public static TypeProtos.MajorType getDrillMajorTypeFromCalciteType(RelDataType relDataType) { + final SqlTypeName sqlTypeName = relDataType.getSqlTypeName(); + + TypeProtos.MinorType minorType = getDrillTypeFromCalciteType(sqlTypeName); + TypeProtos.MajorType.Builder typeBuilder = TypeProtos.MajorType.newBuilder().setMinorType(minorType); + switch (minorType) { + case VAR16CHAR: + case VARCHAR: + case VARBINARY: + case TIMESTAMP: + if (relDataType.getPrecision() > 0) { + typeBuilder.setPrecision(relDataType.getPrecision()); + } + break; + case VARDECIMAL: + typeBuilder.setPrecision(relDataType.getPrecision()); + typeBuilder.setScale(relDataType.getScale()); + } + if (relDataType.isNullable()) { + typeBuilder.setMode(TypeProtos.DataMode.OPTIONAL); + } else { + typeBuilder.setMode(TypeProtos.DataMode.REQUIRED); + } + return typeBuilder.build(); + } + + /** * Given a Calcite's SqlTypeName, return a Drill's corresponding TypeProtos.MinorType */ public static TypeProtos.MinorType getDrillTypeFromCalciteType(final SqlTypeName sqlTypeName) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 4c28887..88bba32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -204,6 +204,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET_VALIDATOR), + new OptionDefinition(ExecConstants.HIVE_MAPRDB_JSON_ALL_TEXT_MODE_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR), new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION), new OptionDefinition(ExecConstants.AFFINITY_FACTOR), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index c71cc3d..ac83a71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -90,6 +90,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { @JacksonInject StoragePluginRegistry engineRegistry, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("selectionRoot") Path selectionRoot, + // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata @JsonProperty("schema") TupleSchema schema ) throws IOException, ExecutionSetupException { super(ImpersonationUtil.resolveUserName(userName)); @@ -271,7 +272,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId)); EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, - columns, selectionRoot, partitionDepth, getTableMetadata().getSchema()); + columns, selectionRoot, partitionDepth, getSchema()); subScan.setOperatorId(this.getOperatorId()); return subScan; } @@ -297,7 +298,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { @Override public String toString() { String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s, schema=%s]"; - return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles(), getTableMetadata().getSchema()); + return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles(), getSchema()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java index 1c793a5..5895ba5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java @@ -17,11 +17,16 @@ */ package org.apache.drill.exec.vector.complex.fn; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.complex.impl.ComplexCopier; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.List; @@ -33,25 +38,51 @@ public class JsonReaderUtils { boolean allTextMode, List<BaseWriter.ListWriter> emptyArrayWriters) { - List<BaseWriter.MapWriter> writerList = Lists.newArrayList(); - List<PathSegment> fieldPathList = Lists.newArrayList(); + ensureAtLeastOneField(writer, columns, null /* schema */, allTextMode, emptyArrayWriters); + } + + public static void ensureAtLeastOneField(BaseWriter.ComplexWriter writer, + Collection<SchemaPath> columns, + TupleMetadata schema, + boolean allTextMode, + List<BaseWriter.ListWriter> emptyArrayWriters) { + + List<BaseWriter.MapWriter> writerList = new ArrayList<>(); + List<PathSegment> fieldPathList = new ArrayList<>(); + List<TypeProtos.MajorType> types = new ArrayList<>(); BitSet emptyStatus = new BitSet(columns.size()); - int i = 0; + int fieldIndex = 0; // first pass: collect which fields are empty - for (SchemaPath sp : columns) { - PathSegment fieldPath = sp.getRootSegment(); + for (SchemaPath schemaPath : columns) { + PathSegment fieldPath = schemaPath.getRootSegment(); BaseWriter.MapWriter fieldWriter = writer.rootAsMap(); + TupleMetadata columnMetadata = schema; while (fieldPath.getChild() != null && !fieldPath.getChild().isArray()) { - fieldWriter = fieldWriter.map(fieldPath.getNameSegment().getPath()); + String name = fieldPath.getNameSegment().getPath(); + if (columnMetadata != null) { + ColumnMetadata metadata = columnMetadata.metadata(name); + columnMetadata = metadata != null ? metadata.mapSchema() : null; + } + fieldWriter = fieldWriter.map(name); fieldPath = fieldPath.getChild(); } writerList.add(fieldWriter); fieldPathList.add(fieldPath); + // for the case when field is absent in the schema, use VARCHAR type + // if allTextMode is enabled or INT type if it is disabled + TypeProtos.MajorType majorType = allTextMode + ? Types.optional(TypeProtos.MinorType.VARCHAR) + : Types.optional(TypeProtos.MinorType.INT); + if (columnMetadata != null) { + ColumnMetadata metadata = columnMetadata.metadata(fieldPath.getNameSegment().getPath()); + majorType = metadata != null ? metadata.majorType() : majorType; + } + types.add(majorType); if (fieldWriter.isEmptyMap()) { - emptyStatus.set(i, true); + emptyStatus.set(fieldIndex, true); } - if (i == 0 && !allTextMode) { + if (fieldIndex == 0 && !allTextMode && schema == null) { // when allTextMode is false, there is not much benefit to producing all // the empty fields; just produce 1 field. The reason is that the type of the // fields is unknown, so if we produce multiple Integer fields by default, a @@ -61,7 +92,7 @@ public class JsonReaderUtils { // is necessary in order to avoid schema change exceptions by downstream operators. break; } - i++; + fieldIndex++; } // second pass: create default typed vectors corresponding to empty fields @@ -72,11 +103,7 @@ public class JsonReaderUtils { BaseWriter.MapWriter fieldWriter = writerList.get(j); PathSegment fieldPath = fieldPathList.get(j); if (emptyStatus.get(j)) { - if (allTextMode) { - fieldWriter.varChar(fieldPath.getNameSegment().getPath()); - } else { - fieldWriter.integer(fieldPath.getNameSegment().getPath()); - } + ComplexCopier.getMapWriterForType(types.get(j), fieldWriter, fieldPath.getNameSegment().getPath()); } } @@ -91,4 +118,4 @@ public class JsonReaderUtils { } } } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java b/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java index cbcf7fb..ca5dfd9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java @@ -18,11 +18,14 @@ package org.apache.drill.metastore; import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.metadata.schema.SchemaProvider; import org.apache.drill.exec.store.parquet.ParquetTableMetadataProviderImpl; import org.apache.drill.metastore.metadata.TableMetadataProvider; import org.apache.drill.metastore.metadata.TableMetadataProviderBuilder; +import java.io.IOException; + /** * Implementation of {@link MetadataProviderManager} which uses file system providers and returns * builders for file system based {@link TableMetadataProvider} instances. @@ -38,6 +41,34 @@ public class FileSystemMetadataProviderManager implements MetadataProviderManage return new FileSystemMetadataProviderManager(); } + /** + * Returns {@link TableMetadataProvider} which provides specified schema. + * + * @param schema table schema which should be provided + * @return {@link TableMetadataProvider} which provides specified schema + * @throws IOException if exception has happened during {@link TableMetadataProvider} construction + */ + public static TableMetadataProvider getMetadataProviderForSchema(TupleMetadata schema) throws IOException { + return new FileSystemMetadataProviderManager().builder(MetadataProviderKind.SCHEMA_STATS_ONLY) + .withSchema(schema) + .build(); + } + + /** + * Checks whether specified {@link MetadataProviderManager} is not null and returns {@link TableMetadataProvider} + * obtained from specified {@link MetadataProviderManager}. + * Otherwise {@link FileSystemMetadataProviderManager} is used to construct {@link TableMetadataProvider}. + * + * @param providerManager metadata provider manager + * @return {@link MetadataProviderManager} instance + * @throws IOException if exception has happened during {@link TableMetadataProvider} construction + */ + public static TableMetadataProvider getMetadataProvider(MetadataProviderManager providerManager) throws IOException { + return providerManager == null + ? new FileSystemMetadataProviderManager().builder(MetadataProviderKind.SCHEMA_STATS_ONLY).build() + : providerManager.getTableMetadataProvider(); + } + @Override public void setSchemaProvider(SchemaProvider schemaProvider) { this.schemaProvider = schemaProvider; diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index ea254a5..dbd73ce 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -606,6 +606,7 @@ drill.exec.options: { store.hive.parquet.optimize_scan_with_native_reader: false, store.hive.maprdb_json.optimize_scan_with_native_reader: false, store.hive.maprdb_json.read_timestamp_with_timezone_offset: false, + store.hive.maprdb_json.all_text_mode: false, # Properties values should NOT be set in double-quotes or any other quotes. # Property name and value should be separated by =. # Properties should be separated by new line (\n). diff --git a/exec/vector/src/main/codegen/templates/ComplexCopier.java b/exec/vector/src/main/codegen/templates/ComplexCopier.java index 6d7c7e6..c138ef6 100644 --- a/exec/vector/src/main/codegen/templates/ComplexCopier.java +++ b/exec/vector/src/main/codegen/templates/ComplexCopier.java @@ -24,6 +24,7 @@ package org.apache.drill.exec.vector.complex.impl; <#include "/@includes/vv_imports.ftl" /> +import org.apache.drill.common.types.TypeProtos; /* * This class is generated using freemarker and the ${.template_name} template. @@ -90,10 +91,10 @@ public class ComplexCopier { } break; } - } + } - private static FieldWriter getMapWriterForReader(FieldReader reader, MapWriter writer, String name) { - switch (reader.getType().getMinorType()) { + public static FieldWriter getMapWriterForType(TypeProtos.MajorType type, MapWriter writer, String name) { + switch (type.getMinorType()) { <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> <#assign fields = minor.fields!type.fields /> <#assign uncappedName = name?uncap_first/> @@ -102,7 +103,7 @@ public class ComplexCopier { return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}</#if>(name); <#elseif minor.class?contains("VarDecimal")> case ${name?upper_case}: - return (FieldWriter) writer.${uncappedName}(name, reader.getType().getScale(), reader.getType().getPrecision()); + return (FieldWriter) writer.${uncappedName}(name, type.getScale(), type.getPrecision()); </#if> </#list></#list> case MAP: @@ -110,7 +111,7 @@ public class ComplexCopier { case LIST: return (FieldWriter) writer.list(name); default: - throw new UnsupportedOperationException(reader.getType().toString()); + throw new UnsupportedOperationException(type.toString()); } } @@ -135,4 +136,8 @@ public class ComplexCopier { throw new UnsupportedOperationException(reader.getType().toString()); } } + + private static FieldWriter getMapWriterForReader(FieldReader reader, BaseWriter.MapWriter writer, String name) { + return getMapWriterForType(reader.getType(), writer, name); + } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java index 9bb6b8d..c02da5e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.record.metadata; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -67,6 +68,35 @@ public class RepeatedListBuilder implements SchemaContainer { return this; } + public RepeatedListBuilder addArray(MinorType type, int width) { + // Existing code uses the repeated list name as the name of + // the vector within the list. + + TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() + .setMinorType(type) + .setMode(DataMode.REPEATED) + .setPrecision(width) + .build(); + + addColumn(MetadataUtils.newScalar(name, majorType)); + return this; + } + + public RepeatedListBuilder addArray(MinorType type, int precision, int scale) { + // Existing code uses the repeated list name as the name of + // the vector within the list. + + TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() + .setMinorType(type) + .setMode(DataMode.REPEATED) + .setPrecision(precision) + .setScale(scale) + .build(); + + addColumn(MetadataUtils.newScalar(name, majorType)); + return this; + } + public RepeatedListColumnMetadata buildColumn() { return MetadataUtils.newRepeatedList(name, child); } diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java index 2bbf3c1..e0dc922 100644 --- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java +++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java @@ -57,6 +57,8 @@ public class SchemaPathUtils { /** * Adds column with specified schema path and type into specified {@code TupleMetadata schema}. + * For the case when specified {@link SchemaPath} has children, corresponding maps will be created + * in the {@code TupleMetadata schema} and the last child of the map will have specified type. * * @param schema tuple schema where column should be added * @param schemaPath schema path of the column which should be added