vvysotskyi commented on a change in pull request #1799: DRILL-7251: Read Hive array w/o nulls URL: https://github.com/apache/drill/pull/1799#discussion_r289409389
########## File path: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java ########## @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive.readers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.hive.HivePartition; +import org.apache.drill.exec.store.hive.HiveTableWithColumnCache; +import org.apache.drill.exec.store.hive.HiveUtilities; +import org.apache.drill.exec.store.hive.writers.HiveValueWriter; +import org.apache.drill.exec.store.hive.writers.HiveValueWriterFactory; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.shaded.guava.com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Reader which uses complex writer underneath to fill in value vectors with data read from Hive. + * At first glance initialization code in the writer looks cumbersome, but in the end it's main aim is to prepare list of key + * fields used in next() and readHiveRecordAndInsertIntoRecordBatch(Object rowValue) methods. + * <p> + * In a nutshell, the reader is used in two stages: + * 1) Setup stage configures mapredReader, partitionObjInspector, partitionDeserializer, list of {@link HiveValueWriter}s for each column in record + * batch, partition vectors and values + * 2) Reading stage uses objects configured previously to get rows from InputSplits, represent each row as Struct of columns values, + * and write each row value of column into Drill's value vectors using HiveValueWriter for each specific column + */ +public class HiveDefaultRecordReader extends AbstractRecordReader { + + protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDefaultRecordReader.class); + + /** + * Max amount of records that can be consumed by one next() method call + */ + public static final int TARGET_RECORD_COUNT = 4000; + + /** + * Key of partition columns in grouped map. + */ + private static final boolean PARTITION_COLUMNS = true; + + /** + * Manages all writes to value vectors received using OutputMutator + */ + protected VectorContainerWriter outputWriter; + + /** + * Before creation of mapredReader Drill creates the object which holds + * meta about table that should be read. + */ + private final HiveTableWithColumnCache hiveTable; + + /** + * Contains info about current user and group. Used to initialize + * the mapredReader using under the user permissions. + */ + private final UserGroupInformation proxyUserGroupInfo; + + /** + * Config to be used for creation of JobConf instance. + */ + private final HiveConf hiveConf; + + /** + * Hive partition wrapper with index of column list in ColumnListsCache. + */ + private final HivePartition partition; + + /** + * This mapredReader creates JobConf instance to use it's handsome metadata. + * Actually the job won't be executed. + */ + private JobConf job; + + /** + * Deserializer to be used for deserialization of row. + * Depending on partition presence it may be partition or table deserializer. + */ + protected Deserializer partitionDeserializer; + + /** + * Used to inspect rows parsed by partitionDeserializer + */ + private StructObjectInspector partitionObjInspector; + + /** + * Converts value deserialized using partitionDeserializer + */ + protected ObjectInspectorConverters.Converter partitionToTableSchemaConverter; + + /** + * Used to inspect rowValue of each column + */ + private StructObjectInspector finalObjInspector; + + /** + * For each concrete column to be read we assign concrete writer + * which encapsulates writing of column values read from Hive into + * specific value vector + */ + private HiveValueWriter[] columnValueWriters; + + /** + * At the moment of mapredReader instantiation we can check inputSplits, + * if splits aren't present than there are no records to read, + * so mapredReader can finish work early. + */ + protected boolean empty; + + /** + * Buffer used for population of partition vectors and to fill in data into vectors via writers + */ + private final DrillBuf drillBuf; + + /** + * The fragmentContext holds different helper objects + * associated with fragment. In the reader it's used + * to get options for accurate detection of partition columns types. + */ + private final FragmentContext fragmentContext; + + /** + * Partition vectors and values are linked together and gets filled after we know that all records are read. + * This two arrays must have same sizes. + */ + private ValueVector[] partitionVectors; + + /** + * Values to be written into partition vector. + */ + private Object[] partitionValues; + + + /** + * InputSplits to be processed by mapredReader. + */ + private final Iterator<InputSplit> inputSplitsIterator; + + /** + * Reader used to to get data from InputSplits + */ + protected RecordReader<Object, Object> mapredReader; + + /** + * Helper object used together with mapredReader to get data from InputSplit. + */ + private Object key; + + /** + * Helper object used together with mapredReader to get data from InputSplit. + */ + protected Object valueHolder; + + /** + * Array of StructField representing columns to be read by the reader. + * Used to extract row value of column from final object inspector. + */ + private StructField[] selectedStructFieldRefs; + + + /** + * Readers constructor called by initializer. + * + * @param table metadata about Hive table being read + * @param partition holder of metadata about table partitioning + * @param inputSplits input splits for reading data from distributed storage + * @param projectedColumns target columns for scan + * @param context fragmentContext of fragment + * @param hiveConf Hive configuration + * @param proxyUgi user/group info to be used for initialization + */ + public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition partition, + Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns, + FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) { + this.hiveTable = table; + this.partition = partition; + this.hiveConf = hiveConf; + this.proxyUserGroupInfo = proxyUgi; + this.empty = inputSplits == null || inputSplits.isEmpty(); + this.inputSplitsIterator = empty ? Collections.emptyIterator() : inputSplits.iterator(); + this.drillBuf = context.getManagedBuffer().reallocIfNeeded(256); + this.partitionVectors = new ValueVector[0]; + this.partitionValues = new Object[0]; + setColumns(projectedColumns); + this.fragmentContext = context; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + Callable<Void> userSpecificInitTask = () -> { + this.job = new JobConf(hiveConf); + Properties hiveTableProperties = HiveUtilities.getTableMetadata(hiveTable); + final Deserializer tableDeserializer = createDeserializer(job, hiveTable.getSd(), hiveTableProperties); + final StructObjectInspector tableObjInspector = getStructOI(tableDeserializer); + + if (partition == null) { + this.partitionDeserializer = tableDeserializer; + this.partitionObjInspector = tableObjInspector; + this.partitionToTableSchemaConverter = (obj) -> obj; + this.finalObjInspector = tableObjInspector; + + job.setInputFormat(HiveUtilities.getInputFormatClass(job, hiveTable.getSd(), hiveTable)); + HiveUtilities.verifyAndAddTransactionalProperties(job, hiveTable.getSd()); + } else { + this.partitionDeserializer = createDeserializer(job, partition.getSd(), HiveUtilities.getPartitionMetadata(partition, hiveTable)); + this.partitionObjInspector = getStructOI(partitionDeserializer); + + this.finalObjInspector = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(partitionObjInspector, tableObjInspector); + this.partitionToTableSchemaConverter = ObjectInspectorConverters.getConverter(partitionObjInspector, finalObjInspector); + + this.job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), hiveTable)); + HiveUtilities.verifyAndAddTransactionalProperties(job, partition.getSd()); + } + + + final List<FieldSchema> partitionKeyFields = hiveTable.getPartitionKeys(); + final List<String> partitionColumnNames = partitionKeyFields.stream().map(FieldSchema::getName).collect(Collectors.toList()); + final List<String> allTableColumnNames = ((StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalObjInspector)).getAllStructFieldNames(); + + /* + ------- Determining which regular/partition column names should be selected + */ + List<String> selectedColumnNames; + List<String> selectedPartitionColumnNames; + List<Integer> idsOfProjectedColumns; + + if (isStarQuery()) { + selectedColumnNames = allTableColumnNames; + selectedPartitionColumnNames = partitionColumnNames; + idsOfProjectedColumns = IntStream.range(0, selectedColumnNames.size()).boxed().collect(Collectors.toList()); + } else { + Map<Boolean, List<String>> groupOfSelectedColumns = getColumns().stream() + .map(SchemaPath::getRootSegment) + .map(PathSegment.NameSegment::getPath) + .distinct() + .collect(Collectors.groupingBy(partitionColumnNames::contains)); + + selectedColumnNames = groupOfSelectedColumns.getOrDefault(!PARTITION_COLUMNS, Collections.emptyList()); + selectedPartitionColumnNames = groupOfSelectedColumns.getOrDefault(PARTITION_COLUMNS, Collections.emptyList()); + idsOfProjectedColumns = selectedColumnNames.stream().map(allTableColumnNames::indexOf).collect(Collectors.toList()); + } + + ColumnProjectionUtils.appendReadColumns(job, idsOfProjectedColumns, selectedColumnNames, + getColumns().stream().map(SchemaPath::getRootSegmentPath).collect(Collectors.toList())); + + /* + --------- Initialize selectedStructFieldRefs and columnValueWriters, which are two key collections of + --------- objects used to read and save columns row data into Drill's value vectors + */ + this.selectedStructFieldRefs = new StructField[selectedColumnNames.size()]; + this.columnValueWriters = new HiveValueWriter[selectedColumnNames.size()]; + this.outputWriter = new VectorContainerWriter(output, /*enabled union*/ false); + HiveValueWriterFactory hiveColumnValueWriterFactory = new HiveValueWriterFactory(drillBuf, outputWriter.getWriter()); + for (int refIdx = 0; refIdx < selectedStructFieldRefs.length; refIdx++) { + String columnName = selectedColumnNames.get(refIdx); + StructField fieldRef = finalObjInspector.getStructFieldRef(columnName); + this.selectedStructFieldRefs[refIdx] = fieldRef; + this.columnValueWriters[refIdx] = hiveColumnValueWriterFactory.createHiveColumnValueWriter(columnName, fieldRef); + } + + /* + --------- Defining selected partition vectors and values to be filled into them + */ + if (partition != null && selectedPartitionColumnNames.size() > 0) { + List<ValueVector> partitionVectorList = new ArrayList<>(selectedPartitionColumnNames.size()); + List<Object> partitionValueList = new ArrayList<>(selectedPartitionColumnNames.size()); + String defaultPartitionValue = hiveConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname); + OptionManager options = fragmentContext.getOptions(); + for (int i = 0; i < partitionKeyFields.size(); i++) { + FieldSchema field = partitionKeyFields.get(i); + String partitionColumnName = field.getName(); + if (selectedPartitionColumnNames.contains(partitionColumnName)) { + TypeInfo partitionColumnTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType()); + TypeProtos.MajorType majorType = HiveUtilities.getMajorTypeFromHiveTypeInfo(partitionColumnTypeInfo, options); + MaterializedField materializedField = MaterializedField.create(partitionColumnName, majorType); + Class<? extends ValueVector> partitionVectorClass = TypeHelper.getValueVectorClass(materializedField.getType().getMinorType(), + materializedField.getDataMode()); + + ValueVector partitionVector = output.addField(materializedField, partitionVectorClass); + partitionVectorList.add(partitionVector); + Object partitionValue = HiveUtilities.convertPartitionType(partitionColumnTypeInfo, partition.getValues().get(i), defaultPartitionValue); + partitionValueList.add(partitionValue); + } + } + this.partitionVectors = partitionVectorList.toArray(new ValueVector[0]); + this.partitionValues = partitionValueList.toArray(); + } + + if (!empty && initNextReader(job)) { + key = mapredReader.createKey(); + valueHolder = mapredReader.createValue(); + internalInit(hiveTableProperties); + } + return null; + }; + + + ListenableFuture<Void> initTaskFuture = context.runCallableAs(proxyUserGroupInfo, userSpecificInitTask); + try { + initTaskFuture.get(); + } catch (InterruptedException e) { + initTaskFuture.cancel(true); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw ExecutionSetupException.fromThrowable(e.getMessage(), e); + } + } + + /** + * Default implementation does nothing, used to apply skip header/footer functionality + * + * @param hiveTableProperties hive table properties + */ + protected void internalInit(Properties hiveTableProperties) { + } + + @Override + public int next() { + outputWriter.allocate(); + outputWriter.reset(); + if (empty) { + outputWriter.setValueCount(0); + populatePartitionVectors(0); + return 0; + } + + try { + int recordCount; + for (recordCount = 0; (recordCount < TARGET_RECORD_COUNT && hasNextValue(valueHolder)); recordCount++) { + Object deserializedHiveRecord = partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable) valueHolder)); + outputWriter.setPosition(recordCount); + readHiveRecordAndInsertIntoRecordBatch(deserializedHiveRecord); + } + outputWriter.setValueCount(recordCount); + populatePartitionVectors(recordCount); + return recordCount; + } catch (ExecutionSetupException | IOException | SerDeException e) { + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + protected void readHiveRecordAndInsertIntoRecordBatch(Object rowValue) { + for (int columnRefIdx = 0; columnRefIdx < selectedStructFieldRefs.length; columnRefIdx++) { + Object columnValue = finalObjInspector.getStructFieldData(rowValue, selectedStructFieldRefs[columnRefIdx]); + if (columnValue == null) { Review comment: Please invert this condition, additional loop breakers make code reading worse. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services