Author: acmurthy Date: Tue Apr 12 01:14:44 2011 New Revision: 1091273 URL: http://svn.apache.org/viewvc?rev=1091273&view=rev Log: MAPREDUCE-2429. Validate JVM in TaskUmbilicalProtocol. Contributed by Siddharth Seth.
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Apr 12 01:14:44 2011 @@ -4,6 +4,9 @@ Release 0.20.204.0 - unreleased BUG FIXES + MAPREDUCE-2429. Validate JVM in TaskUmbilicalProtocol. (Siddharth Seth via + acmurthy) + MAPREDUCE-2418. Show job errors in JobHistory page. (Siddharth Seth via acmurthy) Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java Tue Apr 12 01:14:44 2011 @@ -169,6 +169,7 @@ class Child { UserGroupInformation childUGI = null; + final JvmContext jvmContext = context; try { while (true) { taskid = null; @@ -250,13 +251,14 @@ class Child { // Create a final reference to the task for the doAs block final Task taskFinal = task; + taskFinal.setJvmContext(jvmContext); childUGI.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { try { // use job-specified working directory FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); - taskFinal.run(job, umbilical); // run the task + taskFinal.run(job, umbilical); // run the task } finally { TaskLog.syncLogs (logLocation, taskid, isCleanup, logIsSegmented(job)); @@ -275,7 +277,7 @@ class Child { } } catch (FSError e) { LOG.fatal("FSError from child", e); - umbilical.fsError(taskid, e.getMessage()); + umbilical.fsError(taskid, e.getMessage(), jvmContext); } catch (Exception exception) { LOG.warn("Error running child", exception); try { @@ -301,7 +303,7 @@ class Child { ByteArrayOutputStream baos = new ByteArrayOutputStream(); exception.printStackTrace(new PrintStream(baos)); if (taskid != null) { - umbilical.reportDiagnosticInfo(taskid, baos.toString()); + umbilical.reportDiagnosticInfo(taskid, baos.toString(), jvmContext); } } catch (Throwable throwable) { LOG.fatal("Error running child : " @@ -311,7 +313,7 @@ class Child { String cause = tCause == null ? throwable.getMessage() : StringUtils.stringifyException(tCause); - umbilical.fatalError(taskid, cause); + umbilical.fatalError(taskid, cause, jvmContext); } } finally { RPC.stopProxy(umbilical); Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Tue Apr 12 01:14:44 2011 @@ -53,19 +53,23 @@ public class IsolationRunner { return TaskUmbilicalProtocol.versionID; } - public void done(TaskAttemptID taskid) throws IOException { + public void done(TaskAttemptID taskid, JvmContext jvmContext) + throws IOException { LOG.info("Task " + taskid + " reporting done."); } - public void fsError(TaskAttemptID taskId, String message) throws IOException { + public void fsError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { LOG.info("Task " + taskId + " reporting file system error: " + message); } - public void shuffleError(TaskAttemptID taskId, String message) throws IOException { + public void shuffleError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { LOG.info("Task " + taskId + " reporting shuffle error: " + message); } - public void fatalError(TaskAttemptID taskId, String msg) throws IOException{ + public void fatalError(TaskAttemptID taskId, String msg, + JvmContext jvmContext) throws IOException { LOG.info("Task " + taskId + " reporting fatal error: " + msg); } @@ -73,20 +77,21 @@ public class IsolationRunner { return null; } - public boolean ping(TaskAttemptID taskid) throws IOException { + public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException { return true; } - public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) - throws IOException, InterruptedException { - statusUpdate(taskId, taskStatus); + public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus, + JvmContext jvmContext) throws IOException, InterruptedException { + statusUpdate(taskId, taskStatus, jvmContext); } - public boolean canCommit(TaskAttemptID taskid) throws IOException { + public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) + throws IOException { return true; } - public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) + public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context) throws IOException, InterruptedException { StringBuffer buf = new StringBuffer("Task "); buf.append(taskId); @@ -103,18 +108,20 @@ public class IsolationRunner { return true; } - public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException { + public void reportDiagnosticInfo(TaskAttemptID taskid, String trace, + JvmContext jvmContext) throws IOException { LOG.info("Task " + taskid + " has problem " + trace); } - public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, - int fromEventId, int maxLocs, TaskAttemptID id) throws IOException { - return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY, - false); + public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, + int fromEventId, int maxLocs, TaskAttemptID id, JvmContext jvmContext) + throws IOException { + return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY, + false); } public void reportNextRecordRange(TaskAttemptID taskid, - SortedRanges.Range range) throws IOException { + SortedRanges.Range range, JvmContext jvmContext) throws IOException { LOG.info("Task " + taskid + " reportedNextRecordRange " + range); } Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java Tue Apr 12 01:14:44 2011 @@ -128,6 +128,14 @@ class JvmManager { } } + public boolean validateTipToJvm(TaskInProgress tip, JVMId jvmId) { + if (jvmId.isMapJVM()) { + return mapJvmManager.validateTipToJvm(tip, jvmId); + } else { + return reduceJvmManager.validateTipToJvm(tip, jvmId); + } + } + public TaskInProgress getTaskForJvm(JVMId jvmId) throws IOException { if (jvmId.isMapJVM()) { @@ -223,6 +231,23 @@ class JvmManager { jvmIdToRunner.get(jvmId).setBusy(true); } + synchronized public boolean validateTipToJvm(TaskInProgress tip, JVMId jvmId) { + if (jvmId == null) { + LOG.warn("Null jvmId. Cannot verify Jvm. validateTipToJvm returning false"); + return false; + } + TaskRunner taskRunner = jvmToRunningTask.get(jvmId); + if (taskRunner == null) { + return false; //JvmId not known. + } + TaskInProgress knownTip = taskRunner.getTaskInProgress(); + if (knownTip == tip) { // Valid to compare the addresses ? (or equals) + return true; + } else { + return false; + } + } + synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) throws IOException { if (jvmToRunningTask.containsKey(jvmId)) { Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Tue Apr 12 01:14:44 2011 @@ -311,7 +311,7 @@ class LocalJobRunner implements JobSubmi public JvmTask getTask(JvmContext context) { return null; } - public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) + public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context) throws IOException, InterruptedException { LOG.info(taskStatus.getStateString()); float taskIndex = mapIds.indexOf(taskId); @@ -333,9 +333,10 @@ class LocalJobRunner implements JobSubmi * and it is waiting for the commit Response */ public void commitPending(TaskAttemptID taskid, - TaskStatus taskStatus) + TaskStatus taskStatus, + JvmContext jvmContext) throws IOException, InterruptedException { - statusUpdate(taskid, taskStatus); + statusUpdate(taskid, taskStatus, jvmContext); } /** @@ -347,51 +348,55 @@ class LocalJobRunner implements JobSubmi completedTaskCounters.incrAllCounters(task.getCounters()); } - public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) { + public void reportDiagnosticInfo(TaskAttemptID taskid, String trace, + JvmContext jvmContext) { // Ignore for now } public void reportNextRecordRange(TaskAttemptID taskid, - SortedRanges.Range range) throws IOException { + SortedRanges.Range range, JvmContext jvmContext) throws IOException { LOG.info("Task " + taskid + " reportedNextRecordRange " + range); } - public boolean ping(TaskAttemptID taskid) throws IOException { + public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException { return true; } - public boolean canCommit(TaskAttemptID taskid) + public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) throws IOException { return true; } - public void done(TaskAttemptID taskId) throws IOException { + public void done(TaskAttemptID taskId, JvmContext jvmContext) + throws IOException { int taskIndex = mapIds.indexOf(taskId); - if (taskIndex >= 0) { // mapping + if (taskIndex >= 0) { // mapping status.setMapProgress(1.0f); } else { status.setReduceProgress(1.0f); } } - public synchronized void fsError(TaskAttemptID taskId, String message) - throws IOException { - LOG.fatal("FSError: "+ message + "from task: " + taskId); + public synchronized void fsError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { + LOG.fatal("FSError: " + message + "from task: " + taskId); } - public void shuffleError(TaskAttemptID taskId, String message) throws IOException { - LOG.fatal("shuffleError: "+ message + "from task: " + taskId); + public void shuffleError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { + LOG.fatal("shuffleError: " + message + "from task: " + taskId); } - public synchronized void fatalError(TaskAttemptID taskId, String msg) - throws IOException { - LOG.fatal("Fatal: "+ msg + "from task: " + taskId); + public synchronized void fatalError(TaskAttemptID taskId, String msg, + JvmContext jvmContext) throws IOException { + LOG.fatal("Fatal: " + msg + "from task: " + taskId); } - public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, - int fromEventId, int maxLocs, TaskAttemptID id) throws IOException { + public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, + int fromEventId, int maxLocs, TaskAttemptID id, JvmContext jvmContext) + throws IOException { return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY, - false); + false); } @Override Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Apr 12 01:14:44 2011 @@ -341,12 +341,13 @@ class MapTask extends Task { } @Override - public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) + public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; // start thread that will handle communication with parent - TaskReporter reporter = new TaskReporter(getProgress(), umbilical); + TaskReporter reporter = new TaskReporter(getProgress(), umbilical, + jvmContext); reporter.startCommunicationThread(); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Apr 12 01:14:44 2011 @@ -356,7 +356,8 @@ class ReduceTask extends Task { reducePhase = getProgress().addPhase("reduce"); } // start thread that will handle communication with parent - TaskReporter reporter = new TaskReporter(getProgress(), umbilical); + TaskReporter reporter = new TaskReporter(getProgress(), umbilical, + jvmContext); reporter.startCommunicationThread(); boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi); @@ -1330,7 +1331,7 @@ class ReduceTask extends Task { LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + StringUtils.stringifyException(e)); try { - umbilical.fsError(reduceTask.getTaskID(), e.getMessage()); + umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext); } catch (IOException io) { LOG.error("Could not notify TT of FSError: " + StringUtils.stringifyException(io)); @@ -2299,7 +2300,7 @@ class ReduceTask extends Task { "Killing task " + getTaskID() + "."); umbilical.shuffleError(getTaskID(), "Exceeded MAX_FAILED_UNIQUE_FETCHES;" - + " bailing-out."); + + " bailing-out.", jvmContext); } } @@ -2857,7 +2858,7 @@ class ReduceTask extends Task { umbilical.getMapCompletionEvents(reduceTask.getJobID(), fromEventId.get(), MAX_EVENTS_TO_FETCH, - reduceTask.getTaskID()); + reduceTask.getTaskID(), jvmContext); TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); // Check if the reset is required. Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java Tue Apr 12 01:14:44 2011 @@ -154,6 +154,7 @@ abstract public class Task implements Wr private String pidFile = ""; protected TaskUmbilicalProtocol umbilical; protected SecretKey tokenSecret; + protected JvmContext jvmContext; //////////////////////////////////////////// // Constructors @@ -220,6 +221,21 @@ abstract public class Task implements Wr return this.tokenSecret; } + /** + * Set the task JvmContext + * @param jvmContext + */ + public void setJvmContext(JvmContext jvmContext) { + this.jvmContext = jvmContext; + } + + /** + * Gets the task JvmContext + * @return the jvm context + */ + public JvmContext getJvmContext() { + return this.jvmContext; + } /** * Get the index of this task within the job. @@ -269,7 +285,7 @@ abstract public class Task implements Wr ? StringUtils.stringifyException(throwable) : StringUtils.stringifyException(tCause); try { - umbilical.fatalError(id, cause); + umbilical.fatalError(id, cause, jvmContext); } catch (IOException ioe) { LOG.fatal("Failed to contact the tasktracker", ioe); System.exit(-1); @@ -446,8 +462,7 @@ abstract public class Task implements Wr * @param umbilical for progress reports */ public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical) - throws IOException, ClassNotFoundException, InterruptedException; - + throws IOException, ClassNotFoundException, InterruptedException; /** Return an approprate thread runner for this task. * @param tip TODO*/ @@ -509,6 +524,7 @@ abstract public class Task implements Wr private TaskUmbilicalProtocol umbilical; private InputSplit split = null; private Progress taskProgress; + private JvmContext jvmContext; private Thread pingThread = null; private static final int PROGRESS_STATUS_LEN_LIMIT = 512; private boolean done = true; @@ -522,9 +538,10 @@ abstract public class Task implements Wr private AtomicBoolean progressFlag = new AtomicBoolean(false); TaskReporter(Progress taskProgress, - TaskUmbilicalProtocol umbilical) { + TaskUmbilicalProtocol umbilical, JvmContext jvmContext) { this.umbilical = umbilical; this.taskProgress = taskProgress; + this.jvmContext = jvmContext; } // getters and setters for flag void setProgressFlag() { @@ -630,12 +647,12 @@ abstract public class Task implements Wr taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), counters); - taskFound = umbilical.statusUpdate(taskId, taskStatus); + taskFound = umbilical.statusUpdate(taskId, taskStatus, jvmContext); taskStatus.clearStatus(); } else { // send ping - taskFound = umbilical.ping(taskId); + taskFound = umbilical.ping(taskId, jvmContext); } // if Task Tracker is not aware of our task ID (probably because it died and @@ -709,7 +726,7 @@ abstract public class Task implements Wr if (LOG.isDebugEnabled()) { LOG.debug("sending reportNextRecordRange " + range); } - umbilical.reportNextRecordRange(taskId, range); + umbilical.reportNextRecordRange(taskId, range, jvmContext); } /** @@ -783,7 +800,7 @@ abstract public class Task implements Wr // say the task tracker that task is commit pending while (true) { try { - umbilical.commitPending(taskId, taskStatus); + umbilical.commitPending(taskId, taskStatus, jvmContext); break; } catch (InterruptedException ie) { // ignore @@ -826,7 +843,7 @@ abstract public class Task implements Wr int retries = MAX_RETRIES; while (true) { try { - if (!umbilical.statusUpdate(getTaskID(), taskStatus)) { + if (!umbilical.statusUpdate(getTaskID(), taskStatus, jvmContext)) { LOG.warn("Parent died. Exiting "+taskId); System.exit(66); } @@ -883,7 +900,7 @@ abstract public class Task implements Wr int retries = MAX_RETRIES; while (true) { try { - umbilical.done(getTaskID()); + umbilical.done(getTaskID(), jvmContext); LOG.info("Task '" + taskId + "' done."); return; } catch (IOException ie) { @@ -903,7 +920,7 @@ abstract public class Task implements Wr int retries = MAX_RETRIES; while (true) { try { - while (!umbilical.canCommit(taskId)) { + while (!umbilical.canCommit(taskId, jvmContext)) { try { Thread.sleep(1000); } catch(InterruptedException ie) { Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Apr 12 01:14:44 2011 @@ -2980,6 +2980,12 @@ public class TaskTracker implements MRCo } } + private void validateJVM(TaskInProgress tip, JvmContext jvmContext, TaskAttemptID taskid) throws IOException { + if (!jvmManager.validateTipToJvm(tip, jvmContext.jvmId)) { + throw new IOException("JvmValidate Failed. Ignoring request from task: " + taskid + ", with JvmId: " + jvmContext.jvmId); + } + } + private void authorizeJVM(org.apache.hadoop.mapreduce.JobID jobId) throws IOException { String currentJobId = @@ -3039,11 +3045,13 @@ public class TaskTracker implements MRCo * Called periodically to report Task progress, from 0.0 to 1.0. */ public synchronized boolean statusUpdate(TaskAttemptID taskid, - TaskStatus taskStatus) + TaskStatus taskStatus, + JvmContext jvmContext) throws IOException { authorizeJVM(taskid.getJobID()); TaskInProgress tip = tasks.get(taskid); if (tip != null) { + validateJVM(tip, jvmContext, taskid); tip.reportProgress(taskStatus); return true; } else { @@ -3056,9 +3064,16 @@ public class TaskTracker implements MRCo * Called when the task dies before completion, and we want to report back * diagnostic info */ - public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException { + public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, + String info, JvmContext jvmContext) throws IOException { authorizeJVM(taskid.getJobID()); - reportDiagnosticInfoInternal(taskid, info); + TaskInProgress tip = tasks.get(taskid); + if (tip != null) { + validateJVM(tip, jvmContext, taskid); + tip.reportDiagnosticInfo(info); + } else { + LOG.warn("Error from unknown child task: "+taskid+". Ignored."); + } } /** * Meant to be used internally @@ -3077,10 +3092,11 @@ public class TaskTracker implements MRCo } public synchronized void reportNextRecordRange(TaskAttemptID taskid, - SortedRanges.Range range) throws IOException { + SortedRanges.Range range, JvmContext jvmContext) throws IOException { authorizeJVM(taskid.getJobID()); TaskInProgress tip = tasks.get(taskid); if (tip != null) { + validateJVM(tip, jvmContext, taskid); tip.reportNextRecordRange(range); } else { LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " + @@ -3088,10 +3104,17 @@ public class TaskTracker implements MRCo } } - /** Child checking to see if we're alive. Normally does nothing.*/ - public synchronized boolean ping(TaskAttemptID taskid) throws IOException { + /** Child checking to see if we're alive. Normally does nothing. */ + public synchronized boolean ping(TaskAttemptID taskid, JvmContext jvmContext) + throws IOException { authorizeJVM(taskid.getJobID()); - return tasks.get(taskid) != null; + TaskInProgress tip = tasks.get(taskid); + if (tip != null) { + validateJVM(tip, jvmContext, taskid); + return true; + } else { + return false; + } } /** @@ -3099,33 +3122,38 @@ public class TaskTracker implements MRCo * and it is waiting for the commit Response */ public synchronized void commitPending(TaskAttemptID taskid, - TaskStatus taskStatus) + TaskStatus taskStatus, + JvmContext jvmContext) throws IOException { authorizeJVM(taskid.getJobID()); LOG.info("Task " + taskid + " is in commit-pending," +"" + " task state:" +taskStatus.getRunState()); - statusUpdate(taskid, taskStatus); + // validateJVM is done in statusUpdate + statusUpdate(taskid, taskStatus, jvmContext); reportTaskFinished(taskid, true); } /** * Child checking whether it can commit */ - public synchronized boolean canCommit(TaskAttemptID taskid) - throws IOException { + public synchronized boolean canCommit(TaskAttemptID taskid, + JvmContext jvmContext) throws IOException { authorizeJVM(taskid.getJobID()); - return commitResponses.contains(taskid); //don't remove it now + TaskInProgress tip = tasks.get(taskid); + validateJVM(tip, jvmContext, taskid); + return commitResponses.contains(taskid); // don't remove it now } /** * The task is done. */ - public synchronized void done(TaskAttemptID taskid) + public synchronized void done(TaskAttemptID taskid, JvmContext jvmContext) throws IOException { authorizeJVM(taskid.getJobID()); TaskInProgress tip = tasks.get(taskid); - commitResponses.remove(taskid); if (tip != null) { + validateJVM(tip, jvmContext, taskid); + commitResponses.remove(taskid); tip.reportDone(); } else { LOG.warn("Unknown child task done: "+taskid+". Ignored."); @@ -3136,22 +3164,36 @@ public class TaskTracker implements MRCo /** * A reduce-task failed to shuffle the map-outputs. Kill the task. */ - public synchronized void shuffleError(TaskAttemptID taskId, String message) + public synchronized void shuffleError(TaskAttemptID taskId, String message, JvmContext jvmContext) throws IOException { authorizeJVM(taskId.getJobID()); - LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message); TaskInProgress tip = runningTasks.get(taskId); - tip.reportDiagnosticInfo("Shuffle Error: " + message); - purgeTask(tip, true); + if (tip != null) { + validateJVM(tip, jvmContext, taskId); + LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + + message); + tip.reportDiagnosticInfo("Shuffle Error: " + message); + purgeTask(tip, true); + } else { + LOG.warn("Unknown child task shuffleError: " + taskId + ". Ignored."); + } } /** * A child task had a local filesystem error. Kill the task. */ - public synchronized void fsError(TaskAttemptID taskId, String message) - throws IOException { + public synchronized void fsError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { authorizeJVM(taskId.getJobID()); - fsErrorInternal(taskId, message); + TaskInProgress tip = runningTasks.get(taskId); + if (tip != null) { + validateJVM(tip, jvmContext, taskId); + LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message); + tip.reportDiagnosticInfo("FSError: " + message); + purgeTask(tip, true); + } else { + LOG.warn("Unknown child task fsError: "+taskId+". Ignored."); + } } /** * Meant to be used internally @@ -3170,18 +3212,29 @@ public class TaskTracker implements MRCo /** * A child task had a fatal error. Kill the task. */ - public synchronized void fatalError(TaskAttemptID taskId, String msg) - throws IOException { + public synchronized void fatalError(TaskAttemptID taskId, String msg, + JvmContext jvmContext) throws IOException { authorizeJVM(taskId.getJobID()); - LOG.fatal("Task: " + taskId + " - Killed : " + msg); TaskInProgress tip = runningTasks.get(taskId); - tip.reportDiagnosticInfo("Error: " + msg); - purgeTask(tip, true); + if (tip != null) { + validateJVM(tip, jvmContext, taskId); + LOG.fatal("Task: " + taskId + " - Killed : " + msg); + tip.reportDiagnosticInfo("Error: " + msg); + purgeTask(tip, true); + } else { + LOG.warn("Unknown child task fatalError: "+taskId+". Ignored."); + } } public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents( - JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) - throws IOException { + JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id, + JvmContext jvmContext) throws IOException { + TaskInProgress tip = runningTasks.get(id); + if (tip == null) { + throw new IOException("Unknown task; " + id + + ". Ignoring getMapCompletionEvents Request"); + } + validateJVM(tip, jvmContext, id); authorizeJVM(jobId); TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY; synchronized (shouldReset) { Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Apr 12 01:14:44 2011 @@ -61,7 +61,7 @@ public interface TaskUmbilicalProtocol e * Version 18 Added fatalError for child to communicate fatal errors to TT * */ - public static final long versionID = 18L; + public static final long versionID = 19L; /** * Called when a child task process starts, to get its task. @@ -77,66 +77,78 @@ public interface TaskUmbilicalProtocol e * * @param taskId task-id of the child * @param taskStatus status of the child + * @param jvmContext context the jvmContext running the task. * @throws IOException * @throws InterruptedException * @return True if the task is known */ - boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) - throws IOException, InterruptedException; + boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, + JvmContext context) throws IOException, InterruptedException; /** Report error messages back to parent. Calls should be sparing, since all * such messages are held in the job tracker. * @param taskid the id of the task involved * @param trace the text to report + * @param jvmContext context the jvmContext running the task. */ - void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException; + void reportDiagnosticInfo(TaskAttemptID taskid, String trace, + JvmContext jvmContext) throws IOException; /** * Report the record range which is going to process next by the Task. * @param taskid the id of the task involved * @param range the range of record sequence nos + * @param jvmContext context the jvmContext running the task. * @throws IOException */ - void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) - throws IOException; + void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range, + JvmContext jvmContext) throws IOException; - /** Periodically called by child to check if parent is still alive. + /** Periodically called by child to check if parent is still alive. + * @param taskid the id of the task involved + * @param jvmContext context the jvmContext running the task. * @return True if the task is known */ - boolean ping(TaskAttemptID taskid) throws IOException; + boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException; /** Report that the task is successfully completed. Failure is assumed if * the task process exits without calling this. * @param taskid task's id + * @param jvmContext context the jvmContext running the task. */ - void done(TaskAttemptID taskid) throws IOException; + void done(TaskAttemptID taskid, JvmContext jvmContext) throws IOException; /** * Report that the task is complete, but its commit is pending. * * @param taskId task's id * @param taskStatus status of the child + * @param jvmContext context the jvmContext running the task. * @throws IOException */ - void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) - throws IOException, InterruptedException; + void commitPending(TaskAttemptID taskId, TaskStatus taskStatus, + JvmContext jvmContext) throws IOException, InterruptedException; /** * Polling to know whether the task can go-ahead with commit * @param taskid + * @param jvmContext context the jvmContext running the task. * @return true/false * @throws IOException */ - boolean canCommit(TaskAttemptID taskid) throws IOException; + boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) throws IOException; - /** Report that a reduce-task couldn't shuffle map-outputs.*/ - void shuffleError(TaskAttemptID taskId, String message) throws IOException; + /** Report that a reduce-task couldn't shuffle map-outputs. */ + void shuffleError(TaskAttemptID taskId, String message, JvmContext jvmContext) + throws IOException; /** Report that the task encounted a local filesystem error.*/ - void fsError(TaskAttemptID taskId, String message) throws IOException; + void fsError(TaskAttemptID taskId, String message, JvmContext jvmContext) + throws IOException; /** Report that the task encounted a fatal error.*/ - void fatalError(TaskAttemptID taskId, String message) throws IOException; + void fatalError(TaskAttemptID taskId, String message, JvmContext jvmContext) + throws IOException; /** Called by a reduce task to get the map output locations for finished maps. * Returns an update centered around the map-task-completion-events. @@ -154,7 +166,8 @@ public interface TaskUmbilicalProtocol e MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, int fromIndex, int maxLocs, - TaskAttemptID id) + TaskAttemptID id, + JvmContext jvmContext) throws IOException; /** Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Apr 12 01:14:44 2011 @@ -581,7 +581,7 @@ public class MiniMRCluster { new TaskAttemptID(jtId, jobId.getId(), false, 0, 0); return taskTrackerList.get(index).getTaskTracker() .getMapCompletionEvents(jobId, 0, max, - dummy); + dummy, null); } /** Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java (original) +++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java Tue Apr 12 01:14:44 2011 @@ -99,32 +99,37 @@ public class TestTaskCommit extends Hado boolean taskDone = false; @Override - public boolean canCommit(TaskAttemptID taskid) throws IOException { + public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) + throws IOException { return false; } @Override - public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) - throws IOException, InterruptedException { + public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus, + JvmContext jvmContext) throws IOException, InterruptedException { fail("Task should not go to commit-pending"); } @Override - public void done(TaskAttemptID taskid) throws IOException { + public void done(TaskAttemptID taskid, JvmContext jvmContext) + throws IOException { taskDone = true; } @Override - public void fatalError(TaskAttemptID taskId, String message) - throws IOException { } + public void fatalError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { + } @Override - public void fsError(TaskAttemptID taskId, String message) - throws IOException { } + public void fsError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { + } @Override public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, - int fromIndex, int maxLocs, TaskAttemptID id) throws IOException { + int fromIndex, int maxLocs, TaskAttemptID id, JvmContext jvmContext) + throws IOException { return null; } @@ -134,28 +139,29 @@ public class TestTaskCommit extends Hado } @Override - public boolean ping(TaskAttemptID taskid) throws IOException { + public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) + throws IOException { return true; } @Override - public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) - throws IOException { + public void reportDiagnosticInfo(TaskAttemptID taskid, String trace, + JvmContext jvmContext) throws IOException { } @Override - public void reportNextRecordRange(TaskAttemptID taskid, Range range) - throws IOException { + public void reportNextRecordRange(TaskAttemptID taskid, Range range, + JvmContext jvmContext) throws IOException { } @Override - public void shuffleError(TaskAttemptID taskId, String message) - throws IOException { + public void shuffleError(TaskAttemptID taskId, String message, + JvmContext jvmContext) throws IOException { } @Override - public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) - throws IOException, InterruptedException { + public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, + JvmContext jvmContext) throws IOException, InterruptedException { return true; } @@ -166,9 +172,9 @@ public class TestTaskCommit extends Hado } @Override - public void - updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId, - long[] sizes) throws IOException { + public void updatePrivateDistributedCacheSizes( + org.apache.hadoop.mapreduce.JobID jobId, long[] sizes) + throws IOException { // NOTHING } } Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1091273&r1=1091272&r2=1091273&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original) +++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Tue Apr 12 01:14:44 2011 @@ -102,7 +102,7 @@ public class TestUmbilicalProtocolWithJo proxy = (TaskUmbilicalProtocol) RPC.getProxy( TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID, addr, conf); - proxy.ping(null); + proxy.ping(null, null); } finally { server.stop(); if (proxy != null) {