Repository: incubator-samza
Updated Branches:
  refs/heads/master ca01da75b -> 87ce08e02


SAMZA-408; expose health check metric in yarn am


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/87ce08e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/87ce08e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/87ce08e0

Branch: refs/heads/master
Commit: 87ce08e028515855dfd381858acfc836d6ecd48f
Parents: ca01da7
Author: David Chen <[email protected]>
Authored: Thu Sep 11 15:30:29 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Thu Sep 11 15:30:29 2014 -0700

----------------------------------------------------------------------
 .../samza/job/yarn/SamzaAppMasterLifecycle.scala   |  1 +
 .../samza/job/yarn/SamzaAppMasterMetrics.scala     |  1 +
 .../samza/job/yarn/SamzaAppMasterState.scala       |  1 +
 .../samza/job/yarn/SamzaAppMasterTaskManager.scala |  5 +++++
 .../job/yarn/TestSamzaAppMasterTaskManager.scala   | 17 ++++++++++++++++-
 5 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
index 3d17632..55ff0a8 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -52,6 +52,7 @@ class SamzaAppMasterLifecycle(containerMem: Int, 
containerCpu: Int, state: Samza
       error(shutdownMessage)
       validResourceRequest = false
       state.status = FinalApplicationStatus.FAILED
+      state.jobHealthy = false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index 09b1237..52ede8d 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -75,6 +75,7 @@ class SamzaAppMasterMetrics(
     val mTrackingPort = newGauge("http-port", () => state.trackingPort)
     val mRpcPort = newGauge("rpc-port", () => state.rpcPort)
     val mAppAttemptId = newGauge("app-attempt-id", () => 
state.appAttemptId.toString)
+    val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy) 1 
else 0)
 
     jvm.start
     reporters.values.foreach(_.start)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 471eff4..3872a84 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -42,6 +42,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: 
ContainerId, val nod
   var runningTasks = Map[Int, YarnContainer]()
   var taskToTaskNames = Map[Int, util.Map[TaskName, 
util.Set[SystemStreamPartition]]]()
   var status = FinalApplicationStatus.UNDEFINED
+  var jobHealthy = true
 
   // controlled by the service
   var trackingPort = 0

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index ee08cfb..5b6cc81 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -133,6 +133,9 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
           "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 
1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, 
ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, 
ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
 
         state.neededContainers -= 1
+        if (state.neededContainers == 0) {
+          state.jobHealthy = true
+        }
         state.runningTasks += taskId -> new YarnContainer(container)
         state.unclaimedTasks -= taskId
         state.taskToTaskNames += taskId -> sspTaskNames.getJavaFriendlyType
@@ -193,6 +196,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
           info("Released container %s was assigned task ID %s. Requesting a 
new container for the task." format (containerIdStr, taskId.get))
 
           state.neededContainers += 1
+          state.jobHealthy = false
           state.unclaimedTasks += taskId.get
 
           // request a new container
@@ -203,6 +207,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: 
Config, state: SamzaA
         info("Container %s failed with exit code %d - %s." format 
(containerIdStr, containerStatus.getExitStatus, containerStatus.getDiagnostics))
 
         state.failedContainers += 1
+        state.jobHealthy = false
 
         taskId match {
           case Some(taskId) =>

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87ce08e0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 685620f..8cfdbe0 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -166,6 +166,7 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == true)
     assert(state.completedTasks == 1)
     assert(state.taskCount == 1)
+    assert(state.jobHealthy)
     assert(state.status.equals(FinalApplicationStatus.SUCCEEDED))
   }
 
@@ -185,16 +186,22 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container2))
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, 
"expecting a failure here"))
     assert(taskManager.shouldShutdown == false)
+    assertFalse(state.jobHealthy)
+
     // 2. First is from onInit, second is from onContainerCompleted, since it 
failed.
     assertEquals(2, amClient.getClient.requests.size)
     assertEquals(0, amClient.getClient.getRelease.size)
     assertFalse(taskManager.shouldShutdown)
-    // Now trigger an AM shutdown since our retry count is 1, and we're 
failing twice
+
+    // 3. Now trigger an AM shutdown since our retry count is 1, and we're 
failing twice
     taskManager.onContainerAllocated(getContainer(container2))
+    assert(state.jobHealthy)
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, 
"expecting a failure here"))
     assertEquals(2, amClient.getClient.requests.size)
     assertEquals(0, amClient.getClient.getRelease.size)
+    assertFalse(state.jobHealthy)
     assertTrue(taskManager.shouldShutdown)
+    assert(state.status.equals(FinalApplicationStatus.FAILED))
   }
 
   @Test
@@ -261,8 +268,16 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     taskManager.onContainerCompleted(getContainerStatus(container2, -100, 
"pretend the container was 'lost' due to an NM failure"))
     assert(taskManager.shouldShutdown == false)
+    assert(state.jobHealthy == false)
     assert(amClient.getClient.requests.size == 1)
     assert(amClient.getClient.getRelease.size == 0)
+
+    taskManager.onContainerAllocated(getContainer(container2))
+    assert(state.neededContainers == 0)
+    assert(state.jobHealthy)
+    assert(state.runningTasks.size == 1)
+    assert(state.taskToTaskNames.size == 1)
+    assert(state.unclaimedTasks.size == 0)
   }
 
   @Test

Reply via email to