Author: jlowe Date: Fri Jan 4 19:34:09 2013 New Revision: 1429049 URL: http://svn.apache.org/viewvc?rev=1429049&view=rev Log: svn merge -c 1429040 FIXES: MAPREDUCE-4832. MR AM can get in a split brain situation. Contributed by Jason Lowe
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/ - copied from r1429040, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/ Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jan 4 19:34:09 2013 @@ -61,6 +61,8 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when running jobs in local mode (Devaraj K via bobby) + MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan 4 19:34:09 2013 @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -73,6 +74,8 @@ public class TaskAttemptListenerImpl ext private AppContext context; private Server server; protected TaskHeartbeatHandler taskHeartbeatHandler; + private RMHeartbeatHandler rmHeartbeatHandler; + private long commitWindowMs; private InetSocketAddress address; private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap @@ -83,15 +86,19 @@ public class TaskAttemptListenerImpl ext private JobTokenSecretManager jobTokenSecretManager = null; public TaskAttemptListenerImpl(AppContext context, - JobTokenSecretManager jobTokenSecretManager) { + JobTokenSecretManager jobTokenSecretManager, + RMHeartbeatHandler rmHeartbeatHandler) { super(TaskAttemptListenerImpl.class.getName()); this.context = context; this.jobTokenSecretManager = jobTokenSecretManager; + this.rmHeartbeatHandler = rmHeartbeatHandler; } @Override public void init(Configuration conf) { registerHeartbeatHandler(conf); + commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, + MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); super.init(conf); } @@ -172,6 +179,13 @@ public class TaskAttemptListenerImpl ext taskHeartbeatHandler.progressing(attemptID); + // tell task to retry later if AM has not heard from RM within the commit + // window to help avoid double-committing in a split-brain situation + long now = context.getClock().getTime(); + if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) { + return false; + } + Job job = context.getJob(attemptID.getTaskId().getJobId()); Task task = job.getTask(attemptID.getTaskId()); return task.canCommit(attemptID); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jan 4 19:34:09 2013 @@ -87,6 +87,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; @@ -264,18 +265,20 @@ public class MRAppMaster extends Composi addIfService(dispatcher); } + //service to handle requests from JobClient + clientService = createClientService(context); + addIfService(clientService); + + containerAllocator = createContainerAllocator(clientService, context); + //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context); addIfService(taskAttemptListener); - //service to do the task cleanup + //service to handle the output committer committerEventHandler = createCommitterEventHandler(context, committer); addIfService(committerEventHandler); - //service to handle requests from JobClient - clientService = createClientService(context); - addIfService(clientService); - //service to log job history events EventHandler<JobHistoryEvent> historyService = createJobHistoryHandler(context); @@ -303,7 +306,6 @@ public class MRAppMaster extends Composi speculatorEventDispatcher); // service to allocate containers from RM (if non-uber) or to fake it (uber) - containerAllocator = createContainerAllocator(clientService, context); addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); @@ -582,13 +584,15 @@ public class MRAppMaster extends Composi protected TaskAttemptListener createTaskAttemptListener(AppContext context) { TaskAttemptListener lis = - new TaskAttemptListenerImpl(context, jobTokenSecretManager); + new TaskAttemptListenerImpl(context, jobTokenSecretManager, + getRMHeartbeatHandler()); return lis; } protected EventHandler<CommitterEvent> createCommitterEventHandler( AppContext context, OutputCommitter committer) { - return new CommitterEventHandler(context, committer); + return new CommitterEventHandler(context, committer, + getRMHeartbeatHandler()); } protected ContainerAllocator createContainerAllocator( @@ -596,6 +600,10 @@ public class MRAppMaster extends Composi return new ContainerAllocatorRouter(clientService, context); } + protected RMHeartbeatHandler getRMHeartbeatHandler() { + return (RMHeartbeatHandler) containerAllocator; + } + protected ContainerLauncher createContainerLauncher(final AppContext context) { return new ContainerLauncherRouter(context); @@ -663,7 +671,7 @@ public class MRAppMaster extends Composi * happened. */ private final class ContainerAllocatorRouter extends AbstractService - implements ContainerAllocator { + implements ContainerAllocator, RMHeartbeatHandler { private final ClientService clientService; private final AppContext context; private ContainerAllocator containerAllocator; @@ -708,6 +716,16 @@ public class MRAppMaster extends Composi public void setShouldUnregister(boolean shouldUnregister) { ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister); } + + @Override + public long getLastHeartbeatTime() { + return ((RMCommunicator) containerAllocator).getLastHeartbeatTime(); + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + ((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback); + } } /** Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Fri Jan 4 19:34:09 2013 @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.event.EventHandler; @@ -54,6 +55,7 @@ public class CommitterEventHandler exten private final AppContext context; private final OutputCommitter committer; + private final RMHeartbeatHandler rmHeartbeatHandler; private ThreadPoolExecutor launcherPool; private Thread eventHandlingThread; private BlockingQueue<CommitterEvent> eventQueue = @@ -61,11 +63,14 @@ public class CommitterEventHandler exten private final AtomicBoolean stopped; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; + private long commitWindowMs; - public CommitterEventHandler(AppContext context, OutputCommitter committer) { + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler) { super("CommitterEventHandler"); this.context = context; this.committer = committer; + this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); } @@ -75,6 +80,8 @@ public class CommitterEventHandler exten commitThreadCancelTimeoutMs = conf.getInt( MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS); + commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, + MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); } @Override @@ -210,6 +217,7 @@ public class CommitterEventHandler exten protected void handleJobCommit(CommitterJobCommitEvent event) { try { jobCommitStarted(); + waitForValidCommitWindow(); committer.commitJob(event.getJobContext()); context.getEventHandler().handle( new JobCommitCompletedEvent(event.getJobID())); @@ -248,5 +256,26 @@ public class CommitterEventHandler exten new TaskAttemptEvent(event.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE)); } + + private synchronized void waitForValidCommitWindow() + throws InterruptedException { + long lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime(); + long now = context.getClock().getTime(); + + while (now - lastHeartbeatTime > commitWindowMs) { + rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() { + @Override + public void run() { + synchronized (EventProcessor.this) { + EventProcessor.this.notify(); + } + } + }); + + wait(); + lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime(); + now = context.getClock().getTime(); + } + } } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jan 4 19:34:09 2013 @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -61,7 +62,8 @@ import org.apache.hadoop.yarn.service.Ab /** * Registers/unregisters to RM and sends heartbeats to RM. */ -public abstract class RMCommunicator extends AbstractService { +public abstract class RMCommunicator extends AbstractService + implements RMHeartbeatHandler { private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private int rmPollInterval;//millis protected ApplicationId applicationId; @@ -76,6 +78,8 @@ public abstract class RMCommunicator ext private Resource minContainerCapability; private Resource maxContainerCapability; protected Map<ApplicationAccessType, String> applicationACLs; + private volatile long lastHeartbeatTime; + private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -94,6 +98,7 @@ public abstract class RMCommunicator ext this.applicationId = context.getApplicationID(); this.applicationAttemptId = context.getApplicationAttemptId(); this.stopped = new AtomicBoolean(false); + this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>(); } @Override @@ -234,8 +239,12 @@ public abstract class RMCommunicator ext return; } catch (Exception e) { LOG.error("ERROR IN CONTACTING RM. ", e); + continue; // TODO: for other exceptions } + + lastHeartbeatTime = context.getClock().getTime(); + executeHeartbeatCallbacks(); } catch (InterruptedException e) { if (!stopped.get()) { LOG.warn("Allocated thread interrupted. Returning."); @@ -293,6 +302,23 @@ public abstract class RMCommunicator ext protected abstract void heartbeat() throws Exception; + private void executeHeartbeatCallbacks() { + Runnable callback = null; + while ((callback = heartbeatCallbacks.poll()) != null) { + callback.run(); + } + } + + @Override + public long getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + heartbeatCallbacks.add(callback); + } + public void setShouldUnregister(boolean shouldUnregister) { this.shouldUnregister = shouldUnregister; LOG.info("RMCommunicator notified that shouldUnregistered is: " Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java?rev=1429049&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java Fri Jan 4 19:34:09 2013 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +public interface RMHeartbeatHandler { + long getLastHeartbeatTime(); + + void runOnNextHeartbeat(Runnable callback); +} Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan 4 19:34:09 2013 @@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,7 +43,9 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; @@ -51,8 +55,9 @@ public class TestTaskAttemptListenerImpl public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, + RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager); + super(context, jobTokenSecretManager, rmHeartbeatHandler); this.taskHeartbeatHandler = hbHandler; } @@ -76,9 +81,12 @@ public class TestTaskAttemptListenerImpl public void testGetTask() throws IOException { AppContext appCtx = mock(AppContext.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); MockTaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler); + new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler); Configuration conf = new Configuration(); listener.init(conf); listener.start(); @@ -152,9 +160,11 @@ public class TestTaskAttemptListenerImpl AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -191,4 +201,46 @@ public class TestTaskAttemptListenerImpl return tce; } + @Test + public void testCommitWindow() throws IOException { + SystemClock clock = new SystemClock(); + + org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = + mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); + when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); + Job mockJob = mock(Job.class); + when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); + AppContext appCtx = mock(AppContext.class); + when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); + when(appCtx.getClock()).thenReturn(clock); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptListenerImpl listener = + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + @Override + protected void registerHeartbeatHandler(Configuration conf) { + taskHeartbeatHandler = hbHandler; + } + }; + + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + + // verify commit not allowed when RM heartbeat has not occurred recently + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0); + boolean canCommit = listener.canCommit(tid); + assertFalse(canCommit); + verify(mockTask, never()).canCommit(any(TaskAttemptId.class)); + + // verify commit allowed when RM heartbeat is recent + when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime()); + canCommit = listener.canCommit(tid); + assertTrue(canCommit); + verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class)); + + listener.stop(); + } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Jan 4 19:34:09 2013 @@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; @@ -489,7 +490,8 @@ public class MRApp extends MRAppMaster { return new MRAppContainerAllocator(); } - protected class MRAppContainerAllocator implements ContainerAllocator { + protected class MRAppContainerAllocator + implements ContainerAllocator, RMHeartbeatHandler { private int containerCount; @Override @@ -514,6 +516,16 @@ public class MRApp extends MRAppMaster { new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); } + + @Override + public long getLastHeartbeatTime() { + return getContext().getClock().getTime(); + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + callback.run(); + } } @Override @@ -566,7 +578,8 @@ public class MRApp extends MRAppMaster { } }; - return new CommitterEventHandler(context, stubbedCommitter); + return new CommitterEventHandler(context, stubbedCommitter, + getRMHeartbeatHandler()); } @Override Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Jan 4 19:34:09 2013 @@ -252,7 +252,7 @@ public class TestFail { //task time out is reduced //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure - return new TaskAttemptListenerImpl(getContext(), null) { + return new TaskAttemptListenerImpl(getContext(), null, null) { @Override public void startRpcServer(){}; @Override Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Jan 4 19:34:09 2013 @@ -38,6 +38,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -68,7 +69,9 @@ import org.apache.hadoop.mapreduce.v2.ut import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; +import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -1149,6 +1152,13 @@ public class TestRMContainerAllocator { return context; } + private static AppContext createAppContext( + ApplicationAttemptId appAttemptId, Job job, Clock clock) { + AppContext context = createAppContext(appAttemptId, job); + when(context.getClock()).thenReturn(clock); + return context; + } + private static ClientService createMockClientService() { ClientService service = mock(ClientService.class); when(service.getBindAddress()).thenReturn( @@ -1173,6 +1183,15 @@ public class TestRMContainerAllocator { super.start(); } + public MyContainerAllocator(MyResourceManager rm, Configuration conf, + ApplicationAttemptId appAttemptId, Job job, Clock clock) { + super(createMockClientService(), + createAppContext(appAttemptId, job, clock)); + this.rm = rm; + super.init(conf); + super.start(); + } + @Override protected AMRMProtocol createSchedulerProxy() { return this.rm.getApplicationMasterService(); @@ -1366,6 +1385,66 @@ public class TestRMContainerAllocator { allocator.recalculatedReduceSchedule); } + @Test + public void testHeartbeatHandler() throws Exception { + LOG.info("Running testHeartbeatHandler"); + + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1); + ControlledClock clock = new ControlledClock(new SystemClock()); + AppContext appContext = mock(AppContext.class); + when(appContext.getClock()).thenReturn(clock); + when(appContext.getApplicationID()).thenReturn( + BuilderUtils.newApplicationId(1, 1)); + + RMContainerAllocator allocator = new RMContainerAllocator( + mock(ClientService.class), appContext) { + @Override + protected void register() { + } + @Override + protected AMRMProtocol createSchedulerProxy() { + return mock(AMRMProtocol.class); + } + @Override + protected synchronized void heartbeat() throws Exception { + } + }; + allocator.init(conf); + allocator.start(); + + clock.setTime(5); + int timeToWaitMs = 5000; + while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals(5, allocator.getLastHeartbeatTime()); + clock.setTime(7); + timeToWaitMs = 5000; + while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals(7, allocator.getLastHeartbeatTime()); + + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + allocator.runOnNextHeartbeat(new Runnable() { + @Override + public void run() { + callbackCalled.set(true); + } + }); + clock.setTime(8); + timeToWaitMs = 5000; + while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals(8, allocator.getLastHeartbeatTime()); + Assert.assertTrue(callbackCalled.get()); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Jan 4 19:34:09 2013 @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -166,6 +167,11 @@ import org.junit.Test; } @Override + public RMHeartbeatHandler getRMHeartbeatHandler() { + return getStubbedHeartbeatHandler(getContext()); + } + + @Override protected void sysexit() { } @@ -177,6 +183,7 @@ import org.junit.Test; @Override protected void downloadTokensAndSetupUGI(Configuration conf) { } + } private final class MRAppTestCleanup extends MRApp { @@ -238,6 +245,11 @@ import org.junit.Test; } @Override + public RMHeartbeatHandler getRMHeartbeatHandler() { + return getStubbedHeartbeatHandler(getContext()); + } + + @Override public void cleanupStagingDir() throws IOException { cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; } @@ -247,6 +259,20 @@ import org.junit.Test; } } + private static RMHeartbeatHandler getStubbedHeartbeatHandler( + final AppContext appContext) { + return new RMHeartbeatHandler() { + @Override + public long getLastHeartbeatTime() { + return appContext.getClock().getTime(); + } + @Override + public void runOnNextHeartbeat(Runnable callback) { + callback.run(); + } + }; + } + @Test public void testStagingCleanupOrder() throws Exception { MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1429049&r1=1429040&r2=1429049&view=diff ============================================================================== (empty) Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Jan 4 19:34:09 2013 @@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -502,13 +503,23 @@ public class TestJobImpl { private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { - SystemClock clock = new SystemClock(); + final SystemClock clock = new SystemClock(); AppContext appContext = mock(AppContext.class); when(appContext.getEventHandler()).thenReturn( dispatcher.getEventHandler()); when(appContext.getClock()).thenReturn(clock); + RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() { + @Override + public long getLastHeartbeatTime() { + return clock.getTime(); + } + @Override + public void runOnNextHeartbeat(Runnable callback) { + callback.run(); + } + }; CommitterEventHandler handler = - new CommitterEventHandler(appContext, committer); + new CommitterEventHandler(appContext, committer, heartbeatHandler); dispatcher.register(CommitterEventType.class, handler); return handler; } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Jan 4 19:34:09 2013 @@ -469,6 +469,16 @@ public interface MRJobConfig { 60 * 1000; /** + * Defines a time window in milliseconds for output committer operations. + * If contact with the RM has occurred within this window then commit + * operations are allowed, otherwise the AM will not allow output committer + * operations until contact with the RM has been re-established. + */ + public static final String MR_AM_COMMIT_WINDOW_MS = + MR_AM_PREFIX + "job.committer.commit-window"; + public static final int DEFAULT_MR_AM_COMMIT_WINDOW_MS = 10 * 1000; + + /** * Boolean. Create the base dirs in the JobHistoryEventHandler * Set to false for multi-user clusters. This is an internal config that * is set by the MR framework and read by it too. Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1429049&r1=1429048&r2=1429049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jan 4 19:34:09 2013 @@ -1265,6 +1265,15 @@ </property> <property> + <name>yarn.app.mapreduce.am.job.committer.commit-window</name> + <value>10000</value> + <description>Defines a time window in milliseconds for output commit + operations. If contact with the RM has occurred within this window then + commits are allowed, otherwise the AM will not allow output commits until + contact with the RM has been re-established.</description> +</property> + +<property> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <value>1000</value> <description>The interval in ms at which the MR AppMaster should send