This is an automated email from the ASF dual-hosted git repository. jhung pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 655154c YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg. 655154c is described below commit 655154cb458a8db1d5b35f6400d54d3c8fb72c0c Author: Miklos Szegedi <szege...@apache.org> AuthorDate: Tue Jan 2 18:03:04 2018 -0800 YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg. (cherry picked from commit 7f515f57ede74dae787994f37bfafd5d20c9aa4c) --- .../yarn/server/nodemanager/NodeManager.java | 1 + .../recovery/NMLeveldbStateStoreService.java | 72 ++++++++++++++++++++++ .../nodemanager/recovery/NMStateStoreService.java | 11 ++++ .../recovery/TestNMLeveldbStateStoreService.java | 35 +++++++++++ 4 files changed, 119 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 30346e0..a9bc022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -456,6 +456,7 @@ public class NodeManager extends CompositeService // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + nmStore.setNodeStatusUpdater(nodeStatusUpdater); // Do secure login before calling init for added services. try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 0cbf078..49c2764 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; @@ -158,6 +159,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private DB db; private boolean isNewlyCreated; + private boolean isHealthy; private Timer compactionTimer; /** @@ -172,6 +174,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { @Override protected void startStorage() throws IOException { + // Assume that we're healthy when we start + isHealthy = true; } @Override @@ -190,6 +194,36 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { return isNewlyCreated; } + /** + * If the state store throws an error after recovery has been performed + * then we can not trust it any more to reflect the NM state. We need to + * mark the store and node unhealthy. + * Errors during the recovery will cause a service failure and thus a NM + * start failure. Do not need to mark the store unhealthy for those. + * @param dbErr Exception + */ + private void markStoreUnHealthy(DBException dbErr) { + // Always log the error here, we might not see the error in the caller + LOG.error("Statestore exception: ", dbErr); + // We have already been marked unhealthy so no need to do it again. + if (!isHealthy) { + return; + } + // Mark unhealthy, an out of band heartbeat will be sent and the state + // will remain unhealthy (not recoverable). + // No need to close the store: does not make any difference at this point. + isHealthy = false; + // We could get here before the nodeStatusUpdater is set + NodeStatusUpdater nsu = getNodeStatusUpdater(); + if (nsu != null) { + nsu.reportException(dbErr); + } + } + + @VisibleForTesting + boolean isHealthy() { + return isHealthy; + } @Override public List<RecoveredContainerState> loadContainersState() @@ -362,6 +396,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { db.write(batch); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -386,6 +421,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -401,6 +437,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -416,6 +453,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -432,6 +470,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -449,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), bytes(diagnostics.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -467,6 +507,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -496,6 +537,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -512,6 +554,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -528,6 +571,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), bytes(Integer.toString(exitCode))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -540,6 +584,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -552,6 +597,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), bytes(workDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -564,6 +610,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), bytes(logDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -597,6 +644,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -646,6 +694,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), p.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -667,6 +716,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -823,6 +873,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -846,6 +897,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -869,6 +921,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -934,6 +987,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), taskProto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -944,6 +998,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1017,6 +1072,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1031,6 +1087,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(dbKey), pb.getProto().toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1104,6 +1161,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), bytes(expTime.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1115,6 +1173,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1165,6 +1224,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1175,6 +1235,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1207,6 +1268,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } @@ -1370,6 +1432,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(dbkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } return; @@ -1384,6 +1447,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.put(bytes(fullkey), data); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1395,6 +1459,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { try { db.delete(bytes(fullkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1418,6 +1483,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { candidates.add(key); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } finally { if (iter != null) { @@ -1431,6 +1497,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { db.delete(bytes(key)); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1552,6 +1619,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { return db; } + @VisibleForTesting + void setDB(DB testDb) { + this.db = testDb; + } + /** * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. * 2) Any incompatible change of state-store is a major upgrade, and any diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 350f242..bedf2a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -51,10 +52,20 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Reso @Unstable public abstract class NMStateStoreService extends AbstractService { + private NodeStatusUpdater nodeStatusUpdater = null; + public NMStateStoreService(String name) { super(name); } + protected NodeStatusUpdater getNodeStatusUpdater() { + return nodeStatusUpdater; + } + + public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } + public static class RecoveredApplicationsState { List<ContainerManagerApplicationProto> applications; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 20c5240..ea2cb2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestNMLeveldbStateStoreService { private static final File TMP_DIR = new File( @@ -1185,6 +1188,38 @@ public class TestNMLeveldbStateStoreService { resourceMappings.getAssignedResources("numa").equals(numaRes)); } + @Test + public void testStateStoreNodeHealth() throws IOException { + // keep the working DB clean, break a temp DB + DB keepDB = stateStore.getDB(); + DB myMocked = mock(DB.class); + stateStore.setDB(myMocked); + + ApplicationId appId = ApplicationId.newInstance(1234, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + DBException toThrow = new DBException(); + Mockito.doThrow(toThrow).when(myMocked). + put(any(byte[].class), any(byte[].class)); + // write some data + try { + // chosen a simple method could be any of the "void" methods + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + stateStore.storeContainerKilled(containerId); + } catch (IOException ioErr) { + // Cause should be wrapped DBException + assertTrue(ioErr.getCause() instanceof DBException); + // check the store is marked unhealthy + assertFalse("Statestore should have been unhealthy", + stateStore.isHealthy()); + return; + } finally { + // restore the working DB + stateStore.setDB(keepDB); + } + Assert.fail("Expected exception not thrown"); + } + private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException { // create a container request --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org