Author: bobby
Date: Wed Jun 20 21:22:33 2012
New Revision: 1352330
URL: http://svn.apache.org/viewvc?rev=1352330&view=rev
Log:
MAPREDUCE-3889. job client tries to use /tasklog interface, but that doesn't
exist anymore (Devaraj K via bobby)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1352330&r1=1352329&r2=1352330&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Jun 20
21:22:33 2012
@@ -595,6 +595,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4270. Move the data_join test classes to the correct path.
(Thomas Graves via sseth)
+ MAPREDUCE-3889. job client tries to use /tasklog interface, but that
+ doesn't exist anymore (Devaraj K via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1352330&r1=1352329&r2=1352330&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
(original)
+++
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
Wed Jun 20 21:22:33 2012
@@ -18,30 +18,19 @@
package org.apache.hadoop.mapreduce;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.URL;
-import java.net.URLConnection;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -1367,14 +1356,6 @@ public class Job extends JobContextImpl
Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
IntegerRanges reduceRanges) throws IOException, InterruptedException {
for (TaskCompletionEvent event : events) {
- TaskCompletionEvent.Status status = event.getStatus();
- if (profiling && shouldDownloadProfile() &&
- (status == TaskCompletionEvent.Status.SUCCEEDED ||
- status == TaskCompletionEvent.Status.FAILED) &&
- (event.isMapTask() ? mapRanges : reduceRanges).
- isIncluded(event.idWithinJob())) {
- downloadProfile(event);
- }
switch (filter) {
case NONE:
break;
@@ -1382,7 +1363,6 @@ public class Job extends JobContextImpl
if (event.getStatus() ==
TaskCompletionEvent.Status.SUCCEEDED) {
LOG.info(event.toString());
- displayTaskLogs(event.getTaskAttemptId(),
event.getTaskTrackerHttp());
}
break;
case FAILED:
@@ -1397,8 +1377,6 @@ public class Job extends JobContextImpl
System.err.println(diagnostics);
}
}
- // Displaying the task logs
- displayTaskLogs(event.getTaskAttemptId(),
event.getTaskTrackerHttp());
}
break;
case KILLED:
@@ -1408,67 +1386,10 @@ public class Job extends JobContextImpl
break;
case ALL:
LOG.info(event.toString());
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
break;
}
}
}
-
- private void downloadProfile(TaskCompletionEvent e) throws IOException {
- URLConnection connection = new URL(
- getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
- "&filter=profile").openConnection();
- InputStream in = connection.getInputStream();
- OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
- IOUtils.copyBytes(in, out, 64 * 1024, true);
- }
-
- private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
- throws IOException {
- // The tasktracker for a 'failed/killed' job might not be around...
- if (baseUrl != null) {
- // Construct the url for the tasklogs
- String taskLogUrl = getTaskLogURL(taskId, baseUrl);
-
- // Copy tasks's stdout of the JobClient
- getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
-
- // Copy task's stderr to stderr of the JobClient
- getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
- }
- }
-
- private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
- OutputStream out) {
- try {
- int tasklogtimeout = cluster.getConf().getInt(
- TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
- URLConnection connection = taskLogUrl.openConnection();
- connection.setReadTimeout(tasklogtimeout);
- connection.setConnectTimeout(tasklogtimeout);
- BufferedReader input =
- new BufferedReader(new InputStreamReader(connection.getInputStream()));
- BufferedWriter output =
- new BufferedWriter(new OutputStreamWriter(out));
- try {
- String logData = null;
- while ((logData = input.readLine()) != null) {
- if (logData.length() > 0) {
- output.write(taskId + ": " + logData + "\n");
- output.flush();
- }
- }
- } finally {
- input.close();
- }
- } catch(IOException ioe) {
- LOG.warn("Error reading task output " + ioe.getMessage());
- }
- }
-
- private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
- return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
- }
/** The interval at which monitorAndPrintJob() prints status */
public static int getProgressPollInterval(Configuration conf) {