HIVE-19494: Accept shade prefix during reflective instantiation of output format (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66f6748a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66f6748a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66f6748a Branch: refs/heads/branch-3.0.0 Commit: 66f6748a069aca27532b5e99721b6230145886db Parents: c740e32 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon May 14 16:11:18 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon May 14 16:11:18 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/JavaUtils.java | 4 ++ .../org/apache/hadoop/hive/conf/HiveConf.java | 3 ++ .../hive/streaming/AbstractRecordWriter.java | 50 ++++++++++++-------- 3 files changed, 37 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/66f6748a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index e09dec1..c011cd1 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -71,6 +71,10 @@ public final class JavaUtils { return classLoader; } + public static Class loadClass(String shadePrefix, String className) throws ClassNotFoundException { + return loadClass(shadePrefix + "." + className); + } + public static Class loadClass(String className) throws ClassNotFoundException { return loadClass(className, true); } http://git-wip-us.apache.org/repos/asf/hive/blob/66f6748a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e56c14f..b81c47d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1911,6 +1911,9 @@ public class HiveConf extends Configuration { "Hive streaming ingest has auto flush mechanism to flush all open record updaters under memory pressure.\n" + "When memory usage exceed hive.heap.memory.monitor.default.usage.threshold, the auto-flush mechanism will \n" + "wait until this size (default 100Mb) of records are ingested before triggering flush."), + HIVE_CLASSLOADER_SHADE_PREFIX("hive.classloader.shade.prefix", "", "During reflective instantiation of a class\n" + + "(input, output formats, serde etc.), when classloader throws ClassNotFoundException, as a fallback this\n" + + "shade prefix will be used before class reference and retried."), HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false, "Whether to enable using file metadata cache in metastore for ORC file footers."), http://git-wip-us.apache.org/repos/asf/hive/blob/66f6748a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 685e0cc..b6c8890 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -128,31 +128,41 @@ public abstract class AbstractRecordWriter implements RecordWriter { if (conn == null) { throw new StreamingException("Streaming connection cannot be null during record writer initialization"); } + this.conn = conn; + this.curBatchMinWriteId = minWriteId; + this.curBatchMaxWriteId = maxWriteId; + this.conf = conn.getHiveConf(); + this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); + this.table = conn.getTable(); + this.inputColumns = table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList()); + this.inputTypes = table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList()); + if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) { + this.partitionColumns = table.getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toList()); + this.inputColumns.addAll(partitionColumns); + this.inputTypes + .addAll(table.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList())); + } + this.fullyQualifiedTableName = Warehouse.getQualifiedName(table.getDbName(), table.getTableName()); + String outFormatName = this.table.getSd().getOutputFormat(); try { - this.conn = conn; - this.curBatchMinWriteId = minWriteId; - this.curBatchMaxWriteId = maxWriteId; - this.conf = conn.getHiveConf(); - this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); - this.table = conn.getTable(); - this.inputColumns = table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList()); - this.inputTypes = table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList()); - if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) { - this.partitionColumns = table.getPartitionKeys().stream().map(FieldSchema::getName) - .collect(Collectors.toList()); - this.inputColumns.addAll(partitionColumns); - this.inputTypes - .addAll(table.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList())); - } - this.fullyQualifiedTableName = Warehouse.getQualifiedName(table.getDbName(), table.getTableName()); - String outFormatName = this.table.getSd().getOutputFormat(); this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils .newInstance(JavaUtils.loadClass(outFormatName), conf); - setupMemoryMonitoring(); } catch (ClassNotFoundException e) { - throw new StreamingException(e.getMessage(), e); + String shadePrefix = conf.getVar(HiveConf.ConfVars.HIVE_CLASSLOADER_SHADE_PREFIX); + if (shadePrefix != null && !shadePrefix.trim().isEmpty()) { + try { + LOG.info("Shade prefix: {} specified. Using as fallback to load {}..", shadePrefix, outFormatName); + this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils + .newInstance(JavaUtils.loadClass(shadePrefix, outFormatName), conf); + } catch (ClassNotFoundException e1) { + throw new StreamingException(e.getMessage(), e); + } + } else { + throw new StreamingException(e.getMessage(), e); + } } - + setupMemoryMonitoring(); try { final AbstractSerDe serDe = createSerde(); this.inputRowObjectInspector = (StructObjectInspector) serDe.getObjectInspector();