vvysotskyi commented on a change in pull request #1799: DRILL-7251: Read Hive 
array w/o nulls
URL: https://github.com/apache/drill/pull/1799#discussion_r289389523
 
 

 ##########
 File path: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
 ##########
 @@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.drill.exec.store.hive.writers.HiveValueWriter;
+import org.apache.drill.exec.store.hive.writers.HiveValueWriterFactory;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import 
org.apache.drill.shaded.guava.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Reader which uses complex writer underneath to fill in value vectors with 
data read from Hive.
+ * At first glance initialization code in the writer looks cumbersome, but in 
the end it's main aim is to prepare list of key
+ * fields used in next() and readHiveRecordAndInsertIntoRecordBatch(Object 
rowValue) methods.
+ * <p>
+ * In a nutshell, the reader is used in two stages:
+ * 1) Setup stage configures mapredReader, partitionObjInspector, 
partitionDeserializer, list of {@link HiveValueWriter}s for each column in 
record
+ * batch, partition vectors and values
+ * 2) Reading stage uses objects configured previously to get rows from 
InputSplits, represent each row as Struct of columns values,
+ * and write each row value of column into Drill's value vectors using 
HiveValueWriter for each specific column
+ */
+public class HiveDefaultRecordReader extends AbstractRecordReader {
+
+  protected static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveDefaultRecordReader.class);
+
+  /**
+   * Max amount of records that can be consumed by one next() method call
+   */
+  public static final int TARGET_RECORD_COUNT = 4000;
+
+  /**
+   * Key of partition columns in grouped map.
+   */
+  private static final boolean PARTITION_COLUMNS = true;
+
+  /**
+   * Manages all writes to value vectors received using OutputMutator
+   */
+  protected VectorContainerWriter outputWriter;
+
+  /**
+   * Before creation of mapredReader Drill creates the object which holds
+   * meta about table that should be read.
+   */
+  private final HiveTableWithColumnCache hiveTable;
+
+  /**
+   * Contains info about current user and group. Used to initialize
+   * the mapredReader using under the user permissions.
+   */
+  private final UserGroupInformation proxyUserGroupInfo;
+
+  /**
+   * Config to be used for creation of JobConf instance.
+   */
+  private final HiveConf hiveConf;
+
+  /**
+   * Hive partition wrapper with index of column list in ColumnListsCache.
+   */
+  private final HivePartition partition;
+
+  /**
+   * This mapredReader creates JobConf instance to use it's handsome metadata.
+   * Actually the job won't be executed.
+   */
+  private JobConf job;
+
+  /**
+   * Deserializer to be used for deserialization of row.
+   * Depending on partition presence it may be partition or table deserializer.
+   */
+  protected Deserializer partitionDeserializer;
+
+  /**
+   * Used to inspect rows parsed by partitionDeserializer
+   */
+  private StructObjectInspector partitionObjInspector;
+
+  /**
+   * Converts value deserialized using partitionDeserializer
+   */
+  protected ObjectInspectorConverters.Converter 
partitionToTableSchemaConverter;
+
+  /**
+   * Used to inspect rowValue of each column
+   */
+  private StructObjectInspector finalObjInspector;
+
+  /**
+   * For each concrete column to be read we assign concrete writer
+   * which encapsulates writing of column values read from Hive into
+   * specific value vector
+   */
+  private HiveValueWriter[] columnValueWriters;
+
+  /**
+   * At the moment of mapredReader instantiation we can check inputSplits,
+   * if splits aren't present than there are no records to read,
+   * so mapredReader can finish work early.
+   */
+  protected boolean empty;
+
+  /**
+   * Buffer used for population of partition vectors  and to fill in data into 
vectors via writers
+   */
+  private final DrillBuf drillBuf;
+
+  /**
+   * The fragmentContext holds different helper objects
+   * associated with fragment. In the reader it's used
+   * to get options for accurate detection of partition columns types.
+   */
+  private final FragmentContext fragmentContext;
+
+  /**
+   * Partition vectors and values are linked together and gets filled after we 
know that all records are read.
+   * This two arrays must have same sizes.
+   */
+  private ValueVector[] partitionVectors;
+
+  /**
+   * Values to be written into partition vector.
+   */
+  private Object[] partitionValues;
+
+
+  /**
+   * InputSplits to be processed by mapredReader.
+   */
+  private final Iterator<InputSplit> inputSplitsIterator;
+
+  /**
+   * Reader used to to get data from InputSplits
+   */
+  protected RecordReader<Object, Object> mapredReader;
+
+  /**
+   * Helper object used together with mapredReader to get data from InputSplit.
+   */
+  private Object key;
+
+  /**
+   * Helper object used together with mapredReader to get data from InputSplit.
+   */
+  protected Object valueHolder;
+
+  /**
+   * Array of StructField representing columns to be read by the reader.
+   * Used to extract row value of column from final object inspector.
+   */
+  private StructField[] selectedStructFieldRefs;
+
+
+  /**
+   * Readers constructor called by initializer.
+   *
+   * @param table            metadata about Hive table being read
+   * @param partition        holder of metadata about table partitioning
+   * @param inputSplits      input splits for reading data from distributed 
storage
+   * @param projectedColumns target columns for scan
+   * @param context          fragmentContext of fragment
+   * @param hiveConf         Hive configuration
+   * @param proxyUgi         user/group info to be used for initialization
+   */
+  public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition 
partition,
+                                 Collection<InputSplit> inputSplits, 
List<SchemaPath> projectedColumns,
+                                 FragmentContext context, HiveConf hiveConf, 
UserGroupInformation proxyUgi) {
+    this.hiveTable = table;
+    this.partition = partition;
+    this.hiveConf = hiveConf;
+    this.proxyUserGroupInfo = proxyUgi;
+    this.empty = inputSplits == null || inputSplits.isEmpty();
+    this.inputSplitsIterator = empty ? Collections.emptyIterator() : 
inputSplits.iterator();
+    this.drillBuf = context.getManagedBuffer().reallocIfNeeded(256);
+    this.partitionVectors = new ValueVector[0];
+    this.partitionValues = new Object[0];
+    setColumns(projectedColumns);
+    this.fragmentContext = context;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+    Callable<Void> userSpecificInitTask = () -> {
+      this.job = new JobConf(hiveConf);
+      Properties hiveTableProperties = 
HiveUtilities.getTableMetadata(hiveTable);
+      final Deserializer tableDeserializer = createDeserializer(job, 
hiveTable.getSd(), hiveTableProperties);
+      final StructObjectInspector tableObjInspector = 
getStructOI(tableDeserializer);
+
+      if (partition == null) {
+        this.partitionDeserializer = tableDeserializer;
+        this.partitionObjInspector = tableObjInspector;
+        this.partitionToTableSchemaConverter = (obj) -> obj;
+        this.finalObjInspector = tableObjInspector;
+
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, 
hiveTable.getSd(), hiveTable));
+        HiveUtilities.verifyAndAddTransactionalProperties(job, 
hiveTable.getSd());
+      } else {
+        this.partitionDeserializer = createDeserializer(job, 
partition.getSd(), HiveUtilities.getPartitionMetadata(partition, hiveTable));
+        this.partitionObjInspector = getStructOI(partitionDeserializer);
+
+        this.finalObjInspector = (StructObjectInspector) 
ObjectInspectorConverters.getConvertedOI(partitionObjInspector, 
tableObjInspector);
+        this.partitionToTableSchemaConverter = 
ObjectInspectorConverters.getConverter(partitionObjInspector, 
finalObjInspector);
+
+        this.job.setInputFormat(HiveUtilities.getInputFormatClass(job, 
partition.getSd(), hiveTable));
+        HiveUtilities.verifyAndAddTransactionalProperties(job, 
partition.getSd());
+      }
+
+
+      final List<FieldSchema> partitionKeyFields = 
hiveTable.getPartitionKeys();
+      final List<String> partitionColumnNames = 
partitionKeyFields.stream().map(FieldSchema::getName).collect(Collectors.toList());
 
 Review comment:
   Please move stream calls to the new lines:
   ```suggestion
         final List<String> partitionColumnNames = partitionKeyFields.stream()
             .map(FieldSchema::getName)
             .collect(Collectors.toList());
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to