Repository: incubator-samza Updated Branches: refs/heads/master ca01da75b -> 87ce08e02
SAMZA-408; expose health check metric in yarn am Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/87ce08e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/87ce08e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/87ce08e0 Branch: refs/heads/master Commit: 87ce08e028515855dfd381858acfc836d6ecd48f Parents: ca01da7 Author: David Chen <[email protected]> Authored: Thu Sep 11 15:30:29 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Sep 11 15:30:29 2014 -0700 ---------------------------------------------------------------------- .../samza/job/yarn/SamzaAppMasterLifecycle.scala | 1 + .../samza/job/yarn/SamzaAppMasterMetrics.scala | 1 + .../samza/job/yarn/SamzaAppMasterState.scala | 1 + .../samza/job/yarn/SamzaAppMasterTaskManager.scala | 5 +++++ .../job/yarn/TestSamzaAppMasterTaskManager.scala | 17 ++++++++++++++++- 5 files changed, 24 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala index 3d17632..55ff0a8 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala @@ -52,6 +52,7 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza error(shutdownMessage) validResourceRequest = false state.status = FinalApplicationStatus.FAILED + state.jobHealthy = false } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala index 09b1237..52ede8d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -75,6 +75,7 @@ class SamzaAppMasterMetrics( val mTrackingPort = newGauge("http-port", () => state.trackingPort) val mRpcPort = newGauge("rpc-port", () => state.rpcPort) val mAppAttemptId = newGauge("app-attempt-id", () => state.appAttemptId.toString) + val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy) 1 else 0) jvm.start reporters.values.foreach(_.start) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala index 471eff4..3872a84 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala @@ -42,6 +42,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod var runningTasks = Map[Int, YarnContainer]() var taskToTaskNames = Map[Int, util.Map[TaskName, util.Set[SystemStreamPartition]]]() var status = FinalApplicationStatus.UNDEFINED + var jobHealthy = true // controlled by the service var trackingPort = 0 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index ee08cfb..5b6cc81 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -133,6 +133,9 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)) state.neededContainers -= 1 + if (state.neededContainers == 0) { + state.jobHealthy = true + } state.runningTasks += taskId -> new YarnContainer(container) state.unclaimedTasks -= taskId state.taskToTaskNames += taskId -> sspTaskNames.getJavaFriendlyType @@ -193,6 +196,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA info("Released container %s was assigned task ID %s. Requesting a new container for the task." format (containerIdStr, taskId.get)) state.neededContainers += 1 + state.jobHealthy = false state.unclaimedTasks += taskId.get // request a new container @@ -203,6 +207,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA info("Container %s failed with exit code %d - %s." format (containerIdStr, containerStatus.getExitStatus, containerStatus.getDiagnostics)) state.failedContainers += 1 + state.jobHealthy = false taskId match { case Some(taskId) => http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index 685620f..8cfdbe0 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -166,6 +166,7 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == true) assert(state.completedTasks == 1) assert(state.taskCount == 1) + assert(state.jobHealthy) assert(state.status.equals(FinalApplicationStatus.SUCCEEDED)) } @@ -185,16 +186,22 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container2)) taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here")) assert(taskManager.shouldShutdown == false) + assertFalse(state.jobHealthy) + // 2. First is from onInit, second is from onContainerCompleted, since it failed. assertEquals(2, amClient.getClient.requests.size) assertEquals(0, amClient.getClient.getRelease.size) assertFalse(taskManager.shouldShutdown) - // Now trigger an AM shutdown since our retry count is 1, and we're failing twice + + // 3. Now trigger an AM shutdown since our retry count is 1, and we're failing twice taskManager.onContainerAllocated(getContainer(container2)) + assert(state.jobHealthy) taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here")) assertEquals(2, amClient.getClient.requests.size) assertEquals(0, amClient.getClient.getRelease.size) + assertFalse(state.jobHealthy) assertTrue(taskManager.shouldShutdown) + assert(state.status.equals(FinalApplicationStatus.FAILED)) } @Test @@ -261,8 +268,16 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure")) assert(taskManager.shouldShutdown == false) + assert(state.jobHealthy == false) assert(amClient.getClient.requests.size == 1) assert(amClient.getClient.getRelease.size == 0) + + taskManager.onContainerAllocated(getContainer(container2)) + assert(state.neededContainers == 0) + assert(state.jobHealthy) + assert(state.runningTasks.size == 1) + assert(state.taskToTaskNames.size == 1) + assert(state.unclaimedTasks.size == 0) } @Test
