HIVE-19494: Accept shade prefix during reflective instantiation of output 
format (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66f6748a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66f6748a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66f6748a

Branch: refs/heads/branch-3.0.0
Commit: 66f6748a069aca27532b5e99721b6230145886db
Parents: c740e32
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Mon May 14 16:11:18 2018 -0700
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Mon May 14 16:11:18 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/JavaUtils.java    |  4 ++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 ++
 .../hive/streaming/AbstractRecordWriter.java    | 50 ++++++++++++--------
 3 files changed, 37 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/66f6748a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
index e09dec1..c011cd1 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
@@ -71,6 +71,10 @@ public final class JavaUtils {
     return classLoader;
   }
 
+  public static Class loadClass(String shadePrefix, String className) throws 
ClassNotFoundException {
+    return loadClass(shadePrefix + "." + className);
+  }
+
   public static Class loadClass(String className) throws 
ClassNotFoundException {
     return loadClass(className, true);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/66f6748a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e56c14f..b81c47d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1911,6 +1911,9 @@ public class HiveConf extends Configuration {
       "Hive streaming ingest has auto flush mechanism to flush all open record 
updaters under memory pressure.\n" +
         "When memory usage exceed 
hive.heap.memory.monitor.default.usage.threshold, the auto-flush mechanism will 
\n" +
         "wait until this size (default 100Mb) of records are ingested before 
triggering flush."),
+    HIVE_CLASSLOADER_SHADE_PREFIX("hive.classloader.shade.prefix", "", "During 
reflective instantiation of a class\n" +
+      "(input, output formats, serde etc.), when classloader throws 
ClassNotFoundException, as a fallback this\n" +
+      "shade prefix will be used before class reference and retried."),
 
     
HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", 
false,
         "Whether to enable using file metadata cache in metastore for ORC file 
footers."),

http://git-wip-us.apache.org/repos/asf/hive/blob/66f6748a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 685e0cc..b6c8890 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -128,31 +128,41 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     if (conn == null) {
       throw new StreamingException("Streaming connection cannot be null during 
record writer initialization");
     }
+    this.conn = conn;
+    this.curBatchMinWriteId = minWriteId;
+    this.curBatchMaxWriteId = maxWriteId;
+    this.conf = conn.getHiveConf();
+    this.defaultPartitionName = 
conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+    this.table = conn.getTable();
+    this.inputColumns = 
table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList());
+    this.inputTypes = 
table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList());
+    if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
+      this.partitionColumns = 
table.getPartitionKeys().stream().map(FieldSchema::getName)
+        .collect(Collectors.toList());
+      this.inputColumns.addAll(partitionColumns);
+      this.inputTypes
+        
.addAll(table.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList()));
+    }
+    this.fullyQualifiedTableName = 
Warehouse.getQualifiedName(table.getDbName(), table.getTableName());
+    String outFormatName = this.table.getSd().getOutputFormat();
     try {
-      this.conn = conn;
-      this.curBatchMinWriteId = minWriteId;
-      this.curBatchMaxWriteId = maxWriteId;
-      this.conf = conn.getHiveConf();
-      this.defaultPartitionName = 
conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
-      this.table = conn.getTable();
-      this.inputColumns = 
table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList());
-      this.inputTypes = 
table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList());
-      if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
-        this.partitionColumns = 
table.getPartitionKeys().stream().map(FieldSchema::getName)
-          .collect(Collectors.toList());
-        this.inputColumns.addAll(partitionColumns);
-        this.inputTypes
-          
.addAll(table.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList()));
-      }
-      this.fullyQualifiedTableName = 
Warehouse.getQualifiedName(table.getDbName(), table.getTableName());
-      String outFormatName = this.table.getSd().getOutputFormat();
       this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils
         .newInstance(JavaUtils.loadClass(outFormatName), conf);
-      setupMemoryMonitoring();
     } catch (ClassNotFoundException e) {
-      throw new StreamingException(e.getMessage(), e);
+      String shadePrefix = 
conf.getVar(HiveConf.ConfVars.HIVE_CLASSLOADER_SHADE_PREFIX);
+      if (shadePrefix != null && !shadePrefix.trim().isEmpty()) {
+        try {
+          LOG.info("Shade prefix: {} specified. Using as fallback to load 
{}..", shadePrefix, outFormatName);
+          this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils
+            .newInstance(JavaUtils.loadClass(shadePrefix, outFormatName), 
conf);
+        } catch (ClassNotFoundException e1) {
+          throw new StreamingException(e.getMessage(), e);
+        }
+      } else {
+        throw new StreamingException(e.getMessage(), e);
+      }
     }
-
+    setupMemoryMonitoring();
     try {
       final AbstractSerDe serDe = createSerde();
       this.inputRowObjectInspector = (StructObjectInspector) 
serDe.getObjectInspector();

Reply via email to