Author: cws Date: Mon Sep 10 19:21:03 2012 New Revision: 1383065 URL: http://svn.apache.org/viewvc?rev=1383065&view=rev Log: HIVE-3395. 0.23 compatibility: shim job.tracker.address (Francis Liu via cws)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Mon Sep 10 19:21:03 2012 @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.common.Fil import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; /** @@ -488,7 +489,7 @@ public class Context { * Today this translates into running hadoop jobs locally */ public boolean isLocalOnlyExecutionMode() { - return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local"); + return ShimLoader.getHadoopShims().isLocalMode(conf); } public List<HiveLock> getHiveLocks() { @@ -516,7 +517,7 @@ public class Context { public void restoreOriginalTracker() { if (originalTracker != null) { - HiveConf.setVar(conf, HiveConf.ConfVars.HADOOPJT, originalTracker); + ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, originalTracker); originalTracker = null; } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Mon Sep 10 19:21:03 2012 @@ -373,8 +373,7 @@ public class ExecDriver extends Task<Map try{ MapredLocalWork localwork = work.getMapLocalWork(); if (localwork != null) { - boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local"); - if (!localMode) { + if (!ShimLoader.getHadoopShims().isLocalMode(job)) { Path localPath = new Path(localwork.getTmpFileURI()); Path hdfsPath = new Path(work.getTmpHDFSFileURI()); @@ -706,7 +705,7 @@ public class ExecDriver extends Task<Map OutputStream out = null; Properties deltaP = hconf.getChangedProperties(); - boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local"); + boolean hadoopLocalMode = ShimLoader.getHadoopShims().isLocalMode(hconf); String hadoopSysDir = "mapred.system.dir"; String hadoopWorkDir = "mapred.local.dir"; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Mon Sep 10 19:21:03 2012 @@ -423,10 +423,9 @@ public class HadoopJobExecHelper { * from StreamJob.java. */ public void jobInfo(RunningJob rj) { - if (job.get("mapred.job.tracker", "local").equals("local")) { + if (ShimLoader.getHadoopShims().isLocalMode(job)) { console.printInfo("Job running in-process (local Hadoop)"); } else { - String hp = job.get("mapred.job.tracker"); if (SessionState.get() != null) { SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), Keys.TASK_HADOOP_ID, rj.getJobID()); @@ -434,7 +433,7 @@ public class HadoopJobExecHelper { console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = " + rj.getTrackingURL()); console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) - + " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID()); + + " job -kill " + rj.getJobID()); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java Mon Sep 10 19:21:03 2012 @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.net.NetUtils; @@ -30,7 +31,7 @@ import org.apache.hadoop.net.NetUtils; */ public final class JobTrackerURLResolver { public static String getURL(JobConf conf) throws IOException { - String infoAddr = conf.get("mapred.job.tracker.http.address"); + String infoAddr = ShimLoader.getHadoopShims().getJobLauncherHttpAddress(conf); if (infoAddr == null) { throw new IOException("Unable to find job tracker info port."); } @@ -38,7 +39,7 @@ public final class JobTrackerURLResolver int infoPort = infoSocAddr.getPort(); String jobTrackerStr = - conf.get("mapred.job.tracker", "localhost:8012"); + ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf); InetSocketAddress jobTrackerSocAddr = NetUtils.createSocketAddr(jobTrackerStr); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon Sep 10 19:21:03 2012 @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.ReflectionUtils; /** @@ -156,7 +157,6 @@ public class MapJoinOperator extends Abs } } - boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local"); String baseDir = null; String currentInputFile = getExecContext().getCurrentInputFile(); @@ -165,7 +165,7 @@ public class MapJoinOperator extends Abs String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile); try { - if (localMode) { + if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { baseDir = this.getExecContext().getLocalWork().getTmpFileURI(); } else { Path[] localArchives; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Mon Sep 10 19:21:03 2012 @@ -118,7 +118,7 @@ public class MapRedTask extends ExecDriv if (reason == null) { // clone configuration before modifying it on per-task basis cloneConf(); - conf.setVar(HiveConf.ConfVars.HADOOPJT, "local"); + ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local"); console.printInfo("Selecting local mode for task: " + getId()); this.setLocalMode(true); } else { @@ -127,8 +127,7 @@ public class MapRedTask extends ExecDriv } } - runningViaChild = - "local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) || + runningViaChild = ShimLoader.getHadoopShims().isLocalMode(conf) || conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD); if(!runningViaChild) { @@ -228,7 +227,7 @@ public class MapRedTask extends ExecDriv Map<String, String> variables = new HashMap(System.getenv()); // The user can specify the hadoop memory - if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) { + if (ShimLoader.getHadoopShims().isLocalMode(conf)) { // if we are running in local mode - then the amount of memory used // by the child jvm can no longer default to the memory used by the // parent jvm Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Sep 10 19:21:03 2012 @@ -205,7 +205,7 @@ public final class Utilities { assert jobID != null; gWork = gWorkMap.get(jobID); if (gWork == null) { - String jtConf = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT); + String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job); String path; if (jtConf.equals("local")) { String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN); @@ -354,7 +354,7 @@ public final class Utilities { // Serialize the plan to the default hdfs instance // Except for hadoop local mode execution where we should be // able to get the plan directly from the cache - if (!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) { + if (!ShimLoader.getHadoopShims().isLocalMode(job)) { // Set up distributed cache DistributedCache.createSymlink(job); String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Sep 10 19:21:03 2012 @@ -170,6 +170,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; /** @@ -8310,24 +8311,23 @@ public class SemanticAnalyzer extends Ba + numReducers + ", estimated Input: " + estimatedInput); } - if(MapRedTask.isEligibleForLocalMode(conf, numReducers, + if (MapRedTask.isEligibleForLocalMode(conf, numReducers, estimatedInput, inputSummary.getFileCount()) != null) { hasNonLocalJob = true; break; - }else{ + } else { mrtask.setLocalMode(true); - } + } } catch (IOException e) { throw new SemanticException (e); } } if(!hasNonLocalJob) { - // none of the mapred tasks needs to be run locally. That means that the - // query can be executed entirely in local mode. Save the current tracker - // value and restore it when done - ctx.setOriginalTracker(conf.getVar(HiveConf.ConfVars.HADOOPJT)); - conf.setVar(HiveConf.ConfVars.HADOOPJT, "local"); + // Entire query can be run locally. + // Save the current tracker value and restore it when done. + ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf)); + ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf,"local"); console.printInfo("Automatically selecting local only mode for query"); // If all the tasks can be run locally, we can use local disk for Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java Mon Sep 10 19:21:03 2012 @@ -219,7 +219,8 @@ public class QTestUtil { conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, (new Path(dfsUriString, "/build/ql/test/data/warehouse/")).toString()); - conf.setVar(HiveConf.ConfVars.HADOOPJT, "localhost:" + mr.getJobTrackerPort()); + ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, + "localhost:" + mr.getJobTrackerPort()); } } @@ -1234,4 +1235,4 @@ public class QTestUtil { + "or try \"ant test ... -Dtest.silent=false\" to get more logs."); System.err.flush(); } -} \ No newline at end of file +} Modified: hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original) +++ hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Mon Sep 10 19:21:03 2012 @@ -604,4 +604,24 @@ public class Hadoop20Shims implements Ha // No such functionality in ancient hadoop return; } + + @Override + public boolean isLocalMode(Configuration conf) { + return "local".equals(getJobLauncherRpcAddress(conf)); + } + + @Override + public String getJobLauncherRpcAddress(Configuration conf) { + return conf.get("mapred.job.tracker"); + } + + @Override + public void setJobLauncherRpcAddress(Configuration conf, String val) { + conf.set("mapred.job.tracker", val); + } + + @Override + public String getJobLauncherHttpAddress(Configuration conf) { + return conf.get("mapred.job.tracker.http.address"); + } } Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original) +++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Mon Sep 10 19:21:03 2012 @@ -73,4 +73,24 @@ public class Hadoop20SShims extends Hado public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job) { return new org.apache.hadoop.mapreduce.JobContext(job.getConfiguration(), job.getJobID()); } + + @Override + public boolean isLocalMode(Configuration conf) { + return "local".equals(getJobLauncherRpcAddress(conf)); + } + + @Override + public String getJobLauncherRpcAddress(Configuration conf) { + return conf.get("mapred.job.tracker"); + } + + @Override + public void setJobLauncherRpcAddress(Configuration conf, String val) { + conf.set("mapred.job.tracker", val); + } + + @Override + public String getJobLauncherHttpAddress(Configuration conf) { + return conf.get("mapred.job.tracker.http.address"); + } } Modified: hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Mon Sep 10 19:21:03 2012 @@ -84,4 +84,33 @@ public class Hadoop23Shims extends Hadoo public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job) { return new JobContextImpl(job.getConfiguration(), job.getJobID()); } + + @Override + public boolean isLocalMode(Configuration conf) { + return "local".equals(conf.get("mapreduce.framework.name")); + } + + @Override + public String getJobLauncherRpcAddress(Configuration conf) { + return conf.get("yarn.resourcemanager.address"); + } + + @Override + public void setJobLauncherRpcAddress(Configuration conf, String val) { + if (val.equals("local")) { + // LocalClientProtocolProvider expects both parameters to be 'local'. + conf.set("mapreduce.framework.name", val); + conf.set("mapreduce.jobtracker.address", val); + } + else { + conf.set("mapreduce.framework.name", "yarn"); + conf.set("yarn.resourcemanager.address", val); + } + } + + @Override + public String getJobLauncherHttpAddress(Configuration conf) { + return conf.get("yarn.resourcemanager.webapp.address"); + } + } Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original) +++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Mon Sep 10 19:21:03 2012 @@ -564,4 +564,16 @@ public abstract class HadoopShimsSecure @Override abstract public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job); + + @Override + abstract public boolean isLocalMode(Configuration conf); + + @Override + abstract public void setJobLauncherRpcAddress(Configuration conf, String val); + + @Override + abstract public String getJobLauncherHttpAddress(Configuration conf); + + @Override + abstract public String getJobLauncherRpcAddress(Configuration conf); } Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1383065&r1=1383064&r2=1383065&view=diff ============================================================================== --- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original) +++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Mon Sep 10 19:21:03 2012 @@ -259,6 +259,37 @@ public interface HadoopShims { public JobContext newJobContext(Job job); /** + * Check wether MR is configured to run in local-mode + * @param conf + * @return + */ + public boolean isLocalMode(Configuration conf); + + /** + * All retrieval of jobtracker/resource manager rpc address + * in the configuration should be done through this shim + * @param conf + * @return + */ + public String getJobLauncherRpcAddress(Configuration conf); + + /** + * All updates to jobtracker/resource manager rpc address + * in the configuration should be done through this shim + * @param conf + * @return + */ + public void setJobLauncherRpcAddress(Configuration conf, String val); + + /** + * All references to jobtracker/resource manager http address + * in the configuration should be done through this shim + * @param conf + * @return + */ + public String getJobLauncherHttpAddress(Configuration conf); + + /** * InputSplitShim. * */