Repository: hive Updated Branches: refs/heads/branch-1 e195d9983 -> 0e47786f5
HIVE-10816: NPE in ExecDriver::handleSampling when submitted via child JVM (Rui reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e47786f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e47786f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e47786f Branch: refs/heads/branch-1 Commit: 0e47786f54b2b61ce5c8dec4f962e8cd4ca5ba4d Parents: e195d99 Author: Rui Li <rui...@intel.com> Authored: Wed Jun 10 10:03:41 2015 +0800 Committer: Rui Li <rui...@intel.com> Committed: Wed Jun 10 10:04:37 2015 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/PartitionKeySampler.java | 9 +++++---- .../org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 12 ++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0e47786f/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 96f4530..dc1b601 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -112,7 +112,7 @@ public class PartitionKeySampler implements OutputCollector<HiveKey, Object> { return partitionKeys; } - public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException { + public void writePartitionKeys(Path path, JobConf job) throws IOException { byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks()); int numPartition = partitionKeys.length + 1; if (numPartition != job.getNumReduceTasks()) { @@ -133,10 +133,11 @@ public class PartitionKeySampler implements OutputCollector<HiveKey, Object> { } // random sampling - public static FetchOperator createSampler(FetchWork work, HiveConf conf, JobConf job, + public static FetchOperator createSampler(FetchWork work, JobConf job, Operator<?> operator) throws HiveException { - int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY); - float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY); + int sampleNum = HiveConf.getIntVar(job, HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY); + float samplePercent = + HiveConf.getFloatVar(job, HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY); if (samplePercent < 0.0 || samplePercent > 1.0) { throw new IllegalArgumentException("Percentile value must be within the range of 0 to 1."); } http://git-wip-us.apache.org/repos/asf/hive/blob/0e47786f/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index e4f9543..a2cf712 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -376,7 +376,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) { try { - handleSampling(driverContext, mWork, job, conf); + handleSampling(ctx, mWork, job); job.setPartitionerClass(HiveTotalOrderPartitioner.class); } catch (IllegalStateException e) { console.printInfo("Not enough sampling data.. Rolling back to single reducer task"); @@ -496,7 +496,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop return (returnVal); } - private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf) + private void handleSampling(Context context, MapWork mWork, JobConf job) throws Exception { assert mWork.getAliasToWork().keySet().size() == 1; @@ -512,7 +512,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop inputPaths.add(new Path(path)); } - Path tmpPath = context.getCtx().getExternalTmpPath(inputPaths.get(0)); + Path tmpPath = context.getExternalTmpPath(inputPaths.get(0)); Path partitionFile = new Path(tmpPath, ".partitions"); ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile); PartitionKeySampler sampler = new PartitionKeySampler(); @@ -541,9 +541,9 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop fetchWork.setSource(ts); // random sampling - FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts); + FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, job, ts); try { - ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()}); + ts.initialize(job, new ObjectInspector[]{fetcher.getOutputObjectInspector()}); OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler); while (fetcher.pushRow()) { } } finally { @@ -552,7 +552,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop } else { throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType()); } - sampler.writePartitionKeys(partitionFile, conf, job); + sampler.writePartitionKeys(partitionFile, job); } /**