Author: acmurthy Date: Sat Apr 20 19:24:05 2013 New Revision: 1470219 URL: http://svn.apache.org/r1470219 Log: Merge -c 1470218 from branch-1 to branch-1.2 to fix MAPREDUCE-5066. Added a timeout for the job.end.notification.url. Contributed by Ivan Mitic.
Added: hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java - copied unchanged from r1470218, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java Modified: hadoop/common/branches/branch-1.2/CHANGES.txt hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Modified: hadoop/common/branches/branch-1.2/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1470219&r1=1470218&r2=1470219&view=diff ============================================================================== --- hadoop/common/branches/branch-1.2/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.2/CHANGES.txt Sat Apr 20 19:24:05 2013 @@ -565,6 +565,9 @@ Release 1.2.0 - 2013.04.16 HADOOP-9473. Typo in FileUtil copy() method. (Glen Mazza via suresh) + MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan + Mitic via acmurthy) + Release 1.1.2 - 2013.01.30 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=1470219&r1=1470218&r2=1470219&view=diff ============================================================================== --- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original) +++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Sat Apr 20 19:24:05 2013 @@ -39,51 +39,51 @@ public class JobEndNotifier { private static volatile boolean running; private static BlockingQueue<JobEndStatusInfo> queue = new DelayQueue<JobEndStatusInfo>(); + // Set default timeout to 5 seconds + public static final int MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT = 5000; public static void startNotifier() { running = true; thread = new Thread( - new Runnable() { - public void run() { - try { - while (running) { - sendNotification(queue.take()); - } - } - catch (InterruptedException irex) { - if (running) { - LOG.error("Thread has ended unexpectedly", irex); - } - } - } - - private void sendNotification(JobEndStatusInfo notification) { - try { - int code = httpNotification(notification.getUri()); - if (code != 200) { - throw new IOException("Invalid response status code: " + code); - } - } - catch (IOException ioex) { - LOG.error("Notification failure [" + notification + "]", ioex); - if (notification.configureForRetry()) { - try { - queue.put(notification); - } - catch (InterruptedException iex) { - LOG.error("Notification queuing error [" + notification + "]", - iex); - } - } - } - catch (Exception ex) { - LOG.error("Notification failure [" + notification + "]", ex); - } - } - - } + new Runnable() { + public void run() { + try { + while (running) { + sendNotification(queue.take()); + } + } + catch (InterruptedException irex) { + if (running) { + LOG.error("Thread has ended unexpectedly", irex); + } + } + } - ); + private void sendNotification(JobEndStatusInfo notification) { + try { + int code = httpNotification(notification.getUri(), + notification.getTimeout()); + if (code != 200) { + throw new IOException("Invalid response status code: " + code); + } + } + catch (IOException ioex) { + LOG.error("Notification failure [" + notification + "]", ioex); + if (notification.configureForRetry()) { + try { + queue.put(notification); + } + catch (InterruptedException iex) { + LOG.error("Notification queuing error [" + notification + "]", + iex); + } + } + } + catch (Exception ex) { + LOG.error("Notification failure [" + notification + "]", ex); + } + } + }); thread.start(); } @@ -97,9 +97,10 @@ public class JobEndNotifier { JobEndStatusInfo notification = null; String uri = conf.getJobEndNotificationURI(); if (uri != null) { - // +1 to make logic for first notification identical to a retry - int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1; + int retryAttempts = conf.getInt("job.end.retry.attempts", 0); long retryInterval = conf.getInt("job.end.retry.interval", 30000); + int timeout = conf.getInt("mapreduce.job.end-notification.timeout", + MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT); if (uri.contains("$jobId")) { uri = uri.replace("$jobId", status.getJobID().toString()); } @@ -109,7 +110,8 @@ public class JobEndNotifier { (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED"; uri = uri.replace("$jobStatus", statusStr); } - notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval); + notification = new JobEndStatusInfo( + uri, retryAttempts, retryInterval, timeout); } return notification; } @@ -126,12 +128,17 @@ public class JobEndNotifier { } } - private static int httpNotification(String uri) throws IOException { + private static int httpNotification(String uri, int timeout) + throws IOException { URI url = new URI(uri, false); - HttpClient m_client = new HttpClient(); + + HttpClient httpClient = new HttpClient(); + httpClient.getParams().setSoTimeout(timeout); + httpClient.getParams().setConnectionManagerTimeout(timeout); + HttpMethod method = new GetMethod(url.getEscapedURI()); method.setRequestHeader("Accept", "*/*"); - return m_client.executeMethod(method); + return httpClient.executeMethod(method); } // for use by the LocalJobRunner, without using a thread&queue, @@ -139,9 +146,10 @@ public class JobEndNotifier { public static void localRunnerNotification(JobConf conf, JobStatus status) { JobEndStatusInfo notification = createNotification(conf, status); if (notification != null) { - while (notification.configureForRetry()) { + do { try { - int code = httpNotification(notification.getUri()); + int code = httpNotification(notification.getUri(), + notification.getTimeout()); if (code != 200) { throw new IOException("Invalid response status code: " + code); } @@ -157,13 +165,13 @@ public class JobEndNotifier { } try { synchronized (Thread.currentThread()) { - Thread.currentThread().sleep(notification.getRetryInterval()); + Thread.sleep(notification.getRetryInterval()); } } catch (InterruptedException iex) { LOG.error("Notification retry error [" + notification + "]", iex); } - } + } while (notification.configureForRetry()); } } @@ -172,11 +180,14 @@ public class JobEndNotifier { private int retryAttempts; private long retryInterval; private long delayTime; + private int timeout; - JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) { + JobEndStatusInfo(String uri, int retryAttempts, long retryInterval, + int timeout) { this.uri = uri; this.retryAttempts = retryAttempts; this.retryInterval = retryInterval; + this.timeout = timeout; this.delayTime = System.currentTimeMillis(); } @@ -192,6 +203,10 @@ public class JobEndNotifier { return retryInterval; } + public int getTimeout() { + return timeout; + } + public long getDelayTime() { return delayTime; }