This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a35a6ea  HIVE-25628: Avoid unnecessary file ops if Iceberg table is 
LLAP cached (Adam Szita, reviewed by Marton Bod)
a35a6ea is described below

commit a35a6ea7c16888664a631470e1773319cab59ff6
Author: Adam Szita <40628386+sz...@users.noreply.github.com>
AuthorDate: Thu Oct 28 15:12:08 2021 +0200

    HIVE-25628: Avoid unnecessary file ops if Iceberg table is LLAP cached 
(Adam Szita, reviewed by Marton Bod)
---
 .../mr/hive/vector/HiveVectorizedReader.java       |  12 ++-
 .../apache/iceberg/orc/VectorizedReadUtils.java    | 113 +++++++++++++++------
 2 files changed, 90 insertions(+), 35 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 4f5acea..1375ee8 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -114,19 +115,24 @@ public class HiveVectorizedReader {
           // Need to turn positional schema evolution off since we use column 
name based schema evolution for projection
           // and Iceberg will make a mapping between the file schema and the 
current reading schema.
           job.setBoolean(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(), 
false);
-          VectorizedReadUtils.handleIcebergProjection(inputFile, task, job);
+
+          // TODO: Iceberg currently does not track the last modification time 
of a file. Until that's added,
+          // we need to set Long.MIN_VALUE as last modification time in the 
fileId triplet.
+          SyntheticFileId fileId = new SyntheticFileId(path, 
task.file().fileSizeInBytes(), Long.MIN_VALUE);
+
+          VectorizedReadUtils.handleIcebergProjection(inputFile, task, job, 
fileId);
 
           RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
 
           // If LLAP enabled, try to retrieve an LLAP record reader - this 
might yield to null in some special cases
           if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, 
LlapProxy.isDaemon()) &&
               LlapProxy.getIo() != null) {
-            recordReader = 
LlapProxy.getIo().llapVectorizedOrcReaderForPath(null, path, null, 
readColumnIds,
+            recordReader = 
LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, 
readColumnIds,
                 job, task.start(), task.length(), reporter);
           }
 
           if (recordReader == null) {
-            InputSplit split = new OrcSplit(path, null, task.start(), 
task.length(), (String[]) null, null, false,
+            InputSplit split = new OrcSplit(path, fileId, task.start(), 
task.length(), (String[]) null, null, false,
                  false, com.google.common.collect.Lists.newArrayList(), 0, 
task.length(), path.getParent(), null);
             recordReader = new 
VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
           }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index addf202..30f66b4 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -20,73 +20,122 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.iceberg.org.apache.orc.Reader;
 import org.apache.hive.iceberg.org.apache.orc.TypeDescription;
+import org.apache.hive.iceberg.org.apache.orc.impl.ReaderImpl;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.orc.impl.BufferChunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utilities that rely on Iceberg code from org.apache.iceberg.orc package.
  */
 public class VectorizedReadUtils {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(VectorizedReadUtils.class);
+
   private VectorizedReadUtils() {
 
   }
 
+  private static TypeDescription getSchemaForFile(InputFile inputFile, 
SyntheticFileId fileId, JobConf job)
+      throws IOException {
+    TypeDescription schema = null;
+
+    if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, 
LlapProxy.isDaemon()) &&
+        LlapProxy.getIo() != null) {
+      MapWork mapWork = LlapHiveUtils.findMapWork(job);
+      Path path = new Path(inputFile.location());
+      PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, 
mapWork.getPathToPartitionInfo());
+
+      // Note: Since Hive doesn't know about partition information of Iceberg 
tables, partitionDesc is only used to
+      // deduct the table (and DB) name here.
+      CacheTag cacheTag = HiveConf.getBoolVar(job, 
HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
+          LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) 
: null;
+
+      try {
+        // Schema has to be serialized and deserialized as it is passed 
between different packages of TypeDescription:
+        // Iceberg expects 
org.apache.hive.iceberg.org.apache.orc.TypeDescription as it shades ORC, while 
LLAP provides
+        // the unshaded org.apache.orc.TypeDescription type.
+        BufferChunk tailBuffer = LlapProxy.getIo().getOrcTailFromCache(path, 
job, cacheTag, fileId).getTailBuffer();
+        schema = ReaderImpl.extractFileTail(tailBuffer.getData()).getSchema();
+      } catch (IOException ioe) {
+        LOG.warn("LLAP is turned on but was unable to get file metadata 
information through its cache for {}",
+            path, ioe);
+      }
+
+    }
+
+    // Fallback to simple ORC reader file opening method in lack of or failure 
of LLAP.
+    if (schema == null) {
+      try (Reader orcFileReader = ORC.newFileReader(inputFile, job)) {
+        schema = orcFileReader.getSchema();
+      }
+    }
+
+    return schema;
+
+  }
+
   /**
    * Adjusts the jobConf so that column reorders and renames that might have 
happened since this ORC file was written
    * are properly mapped to the schema of the original file.
    * @param inputFile - the original ORC file - this needs to be accessed to 
retrieve the original schema for mapping
    * @param task - Iceberg task - required for
    * @param job - JobConf instance to adjust
+   * @param fileId - FileID for the input file, serves as cache key in an LLAP 
setup
    * @throws IOException - errors relating to accessing the ORC file
    */
-  public static void handleIcebergProjection(InputFile inputFile, FileScanTask 
task, JobConf job)
-      throws IOException {
-    Reader orcFileReader = ORC.newFileReader(inputFile, job);
-
-    try {
-      // We need to map with the current (i.e. current Hive table columns) 
full schema (without projections),
-      // as OrcInputFormat will take care of the projections by the use of an 
include boolean array
-      Schema currentSchema = task.spec().schema();
-      TypeDescription fileSchema = orcFileReader.getSchema();
-
-      TypeDescription readOrcSchema;
-      if (ORCSchemaUtil.hasIds(fileSchema)) {
-        readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, 
fileSchema);
-      } else {
-        TypeDescription typeWithIds =
-            ORCSchemaUtil.applyNameMapping(fileSchema, 
MappingUtil.create(currentSchema));
-        readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, 
typeWithIds);
-      }
+  public static void handleIcebergProjection(InputFile inputFile, FileScanTask 
task, JobConf job,
+      SyntheticFileId fileId) throws IOException {
+
+    // We need to map with the current (i.e. current Hive table columns) full 
schema (without projections),
+    // as OrcInputFormat will take care of the projections by the use of an 
include boolean array
+    Schema currentSchema = task.spec().schema();
+    TypeDescription fileSchema = getSchemaForFile(inputFile, fileId, job);
+
+    TypeDescription readOrcSchema;
+    if (ORCSchemaUtil.hasIds(fileSchema)) {
+      readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, 
fileSchema);
+    } else {
+      TypeDescription typeWithIds =
+          ORCSchemaUtil.applyNameMapping(fileSchema, 
MappingUtil.create(currentSchema));
+      readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, 
typeWithIds);
+    }
 
-      job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, 
readOrcSchema.toString());
+    job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, readOrcSchema.toString());
 
-      // Predicate pushdowns needs to be adjusted too in case of column 
renames, we let Iceberg generate this into job
-      if (task.residual() != null) {
-        Expression boundFilter = Binder.bind(currentSchema.asStruct(), 
task.residual(), false);
+    // Predicate pushdowns needs to be adjusted too in case of column renames, 
we let Iceberg generate this into job
+    if (task.residual() != null) {
+      Expression boundFilter = Binder.bind(currentSchema.asStruct(), 
task.residual(), false);
 
-        // Note the use of the unshaded version of this class here (required 
for SARG deseralization later)
-        org.apache.hadoop.hive.ql.io.sarg.SearchArgument sarg =
-            ExpressionToOrcSearchArgument.convert(boundFilter, readOrcSchema);
-        if (sarg != null) {
-          job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
-          job.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
+      // Note the use of the unshaded version of this class here (required for 
SARG deseralization later)
+      org.apache.hadoop.hive.ql.io.sarg.SearchArgument sarg =
+          ExpressionToOrcSearchArgument.convert(boundFilter, readOrcSchema);
+      if (sarg != null) {
+        job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+        job.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
 
-          job.set(ConvertAstToSearchArg.SARG_PUSHDOWN, 
ConvertAstToSearchArg.sargToKryo(sarg));
-        }
+        job.set(ConvertAstToSearchArg.SARG_PUSHDOWN, 
ConvertAstToSearchArg.sargToKryo(sarg));
       }
-    } finally {
-      orcFileReader.close();
     }
   }
 }

Reply via email to