Repository: hadoop Updated Branches: refs/heads/branch-2.8 2fce0fa8d -> d665d8568
YARN-5547. NMLeveldbStateStore should be more tolerant of unknown keys. Contributed by Ajith S. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d665d856 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d665d856 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d665d856 Branch: refs/heads/branch-2.8 Commit: d665d856825a6aaf48632a1826e1ca10608f9c10 Parents: 2fce0fa Author: Junping Du <junping...@apache.org> Authored: Thu Aug 31 18:10:33 2017 -0700 Committer: Junping Du <junping...@apache.org> Committed: Thu Aug 31 18:10:33 2017 -0700 ---------------------------------------------------------------------- .../containermanager/ContainerManagerImpl.java | 7 ++ .../recovery/NMLeveldbStateStoreService.java | 19 +++++- .../recovery/NMStateStoreService.java | 19 +++++- .../TestNMLeveldbStateStoreService.java | 69 ++++++++++++++++++++ 4 files changed, 112 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f881586..942f7ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -144,6 +144,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; @@ -350,6 +351,12 @@ public class ContainerManagerImpl extends CompositeService implements credentials, metrics, token, context, rcs); context.getContainers().put(containerId, container); app.handle(new ApplicationContainerInitEvent(container)); + if (rcs.getRecoveryType() == RecoveredContainerType.KILL) { + dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED, + "Due to invalid StateStore info container was killed" + + " during recovery")); + } } else { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { LOG.warn(containerId + " has no corresponding application!"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index f5a36f2..0408de2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -70,6 +70,8 @@ import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteBatch; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; public class NMLeveldbStateStoreService extends NMStateStoreService { @@ -134,6 +136,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private boolean isNewlyCreated; private Timer compactionTimer; + /** + * Map of containerID vs List of unknown key suffixes. + */ + private ListMultimap<ContainerId, String> containerUnknownKeySuffixes = + ArrayListMultimap.create(); + public NMLeveldbStateStoreService() { super(NMLeveldbStateStoreService.class.getName()); } @@ -251,7 +259,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { rcs.capability = new ResourcePBImpl( ResourceProto.parseFrom(entry.getValue())); } else { - throw new IOException("Unexpected container state key: " + key); + LOG.warn("the container " + containerId + + " will be killed because of the unknown key " + key + + " during recovery."); + containerUnknownKeySuffixes.put(containerId, suffix); + rcs.setRecoveryType(RecoveredContainerType.KILL); } } return rcs; @@ -380,6 +392,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); + List<String> unknownKeysForContainer = + containerUnknownKeySuffixes.removeAll(containerId); + for (String unknownKeySuffix : unknownKeysForContainer) { + batch.delete(bytes(keyPrefix + unknownKeySuffix)); + } db.write(batch); } finally { batch.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 9fca5be..7eb8763 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -59,6 +59,13 @@ public abstract class NMStateStoreService extends AbstractService { } + /** + * Type of post recovery action. + */ + public enum RecoveredContainerType { + KILL, RECOVER + } + public enum RecoveredContainerStatus { REQUESTED, LAUNCHED, @@ -73,7 +80,9 @@ public abstract class NMStateStoreService extends AbstractService { StartContainerRequest startRequest; Resource capability; int version; - + private RecoveredContainerType recoveryType = + RecoveredContainerType.RECOVER; + public RecoveredContainerStatus getStatus() { return status; } @@ -113,6 +122,14 @@ public abstract class NMStateStoreService extends AbstractService { .append(", StartRequest: ").append(getStartRequest()) .toString(); } + + public RecoveredContainerType getRecoveryType() { + return recoveryType; + } + + public void setRecoveryType(RecoveredContainerType recoveryType) { + this.recoveryType = recoveryType; + } } public static class LocalResourceTrackerState { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index c5f05f1..27e8b78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState; @@ -923,6 +924,74 @@ public class TestNMLeveldbStateStoreService { store.close(); } + @Test + public void testUnexpectedKeyDoesntThrowException() throws IOException { + // test empty when no state + List<RecoveredContainerState> recoveredContainers = stateStore + .loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + // create a container request + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(); + localResources.put("rsrc", lrsrc); + Map<String, String> env = new HashMap<String, String>(); + env.put("somevar", "someval"); + List<String> containerCmds = new ArrayList<String>(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = ByteBuffer + .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, + serviceData, containerTokens, acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier( + containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, + Priority.newInstance(7), 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, + containerToken); + + stateStore.storeContainer(containerId, 0, containerReq); + + // add a invalid key + byte[] invalidKey = ("ContainerManager/containers/" + + containerId.toString() + "/invalidKey1234").getBytes(); + stateStore.getDB().put(invalidKey, new byte[1]); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType()); + // assert unknown keys are cleaned up finally + assertNotNull(stateStore.getDB().get(invalidKey)); + stateStore.removeContainer(containerId); + assertNull(stateStore.getDB().get(invalidKey)); + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org