This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dee9977a Add new SamzaApplicationMaster metric to track allocated 
containers buffered in AM (#1677)
7dee9977a is described below

commit 7dee9977a4cc689d80376c1f53817af90aa5accb
Author: jia-gao <[email protected]>
AuthorDate: Tue Jul 18 13:13:59 2023 -0700

    Add new SamzaApplicationMaster metric to track allocated containers 
buffered in AM (#1677)
    
    * Add new SamzaApplicationMaster metric to track containers allocated by RM 
and buffered in AM
    
    * update TestApplicationMasterRestClient
    
    * Add allocated-containers-in-buffer in metrics doc
---
 .../versioned/container/metrics-table.html         |  4 +++
 .../samza/job/yarn/YarnClusterResourceManager.java |  2 ++
 .../samza/job/yarn/SamzaAppMasterMetrics.scala     |  9 +++++
 .../job/yarn/TestYarnClusterResourceManager.java   | 41 ++++++++++++++++++++++
 .../webapp/TestApplicationMasterRestClient.java    |  3 +-
 5 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/docs/learn/documentation/versioned/container/metrics-table.html 
b/docs/learn/documentation/versioned/container/metrics-table.html
index e130fb085..682947392 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -523,6 +523,10 @@
         <td>container-from-previous-attempt</td>
         <td>Number of containers carried from previous attempt in YARN. The 
metrics is applicable only when Application Master High Availability is 
enabled</td>
     </tr>
+    <tr>
+        <td>allocated-containers-in-buffer</td>
+        <td>Number of containers allocated by the RM and buffered in the 
AM</td>
+    </tr>
 
     <tr>
         <th colspan="2" class="section" 
id="kafka-system-consumer-metrics">org.apache.samza.system.kafka.KafkaSystemConsumerMetrics</th>
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index c07a45b9a..2436a8150 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -298,6 +298,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
       }
       amClient.releaseAssignedContainer(container.getId());
       allocatedResources.remove(resource);
+      metrics.decrementAllocatedContainersInBuffer();
     }
   }
 
@@ -508,6 +509,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
       SamzaResource resource = new SamzaResource(numCores, memory, host, 
containerId);
       allocatedResources.put(resource, container);
       resources.add(resource);
+      metrics.incrementAllocatedContainersInBuffer();
     }
     clusterManagerCallback.onResourcesAvailable(resources);
   }
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index fb540f77f..7f125964c 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -42,6 +42,7 @@ class SamzaAppMasterMetrics(val config: Config,
 
   private val metricsConfig = new MetricsConfig(config)
   val containersFromPreviousAttempts = 
newGauge("container-from-previous-attempt", 0L)
+  val allocatedContainersInBuffer = newGauge("allocated-containers-in-buffer", 
0L)
   val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, 
SamzaAppMasterMetrics.sourceName).asScala
   reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, 
registry))
 
@@ -49,6 +50,14 @@ class SamzaAppMasterMetrics(val config: Config,
     containersFromPreviousAttempts.set(containerCount)
   }
 
+  def incrementAllocatedContainersInBuffer(): Unit = {
+    allocatedContainersInBuffer.set(allocatedContainersInBuffer.getValue + 1)
+  }
+
+  def decrementAllocatedContainersInBuffer(): Unit = {
+    allocatedContainersInBuffer.set(allocatedContainersInBuffer.getValue - 1)
+  }
+
   def start() {
     val mRunningContainers = newGauge("running-containers", () => 
state.runningProcessors.size)
     val mNeededContainers = newGauge("needed-containers", () => 
state.neededProcessors.get())
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index c7ae46f97..b17ccdb61 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.yarn;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
@@ -250,4 +251,44 @@ public class TestYarnClusterResourceManager {
 
     verify(asyncNMClient, times(1)).stopContainerAsync(runningContainerId, 
runningNodeId);
   }
+
+  @Test
+  public void testIncrementAllocatedContainersInBuffer() {
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, 
yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+    yarnClusterResourceManager.start();
+    Container allocatedContainer = mock(Container.class);
+    ContainerId allocatedContainerId = mock(ContainerId.class);
+    NodeId allocatedNodeId = mock(NodeId.class);
+    Resource resource = mock(Resource.class);
+
+    when(allocatedNodeId.getHost()).thenReturn("fake_host");
+    when(resource.getVirtualCores()).thenReturn(1);
+    when(resource.getMemory()).thenReturn(1024);
+    when(allocatedContainer.getId()).thenReturn(allocatedContainerId);
+    when(allocatedContainer.getNodeId()).thenReturn(allocatedNodeId);
+    when(allocatedContainer.getResource()).thenReturn(resource);
+    
yarnClusterResourceManager.onContainersAllocated(ImmutableList.of(allocatedContainer));
+    verify(metrics).incrementAllocatedContainersInBuffer();
+  }
+  @Test
+  public void testDecrementAllocatedContainersInBuffer() {
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, 
yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+    yarnClusterResourceManager.start();
+    SamzaResource allocatedContainerResource = mock(SamzaResource.class);
+    Container runningContainer = mock(Container.class);
+    ContainerId runningContainerId = mock(ContainerId.class);
+    NodeId runningNodeId = mock(NodeId.class);
+
+    when(runningContainer.getId()).thenReturn(runningContainerId);
+    when(runningContainer.getNodeId()).thenReturn(runningNodeId);
+
+    
yarnClusterResourceManager.getAllocatedResources().put(allocatedContainerResource,
 runningContainer);
+    yarnClusterResourceManager.releaseResources(allocatedContainerResource);
+    verify(metrics).decrementAllocatedContainersInBuffer();
+  }
+
 }
\ No newline at end of file
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
index f60f232e6..b3c0affea 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
@@ -108,7 +108,7 @@ public class TestApplicationMasterRestClient {
     assertTrue(metricsResult.containsKey(group));
 
     Map<String, Object> amMetricsGroup = metricsResult.get(group);
-    assertEquals(8, amMetricsGroup.size());
+    assertEquals(9, amMetricsGroup.size());
     assertEquals(samzaAppState.runningProcessors.size(),  
amMetricsGroup.get("running-containers"));
     assertEquals(samzaAppState.neededProcessors.get(),    
amMetricsGroup.get("needed-containers"));
     assertEquals(samzaAppState.completedProcessors.get(), 
amMetricsGroup.get("completed-containers"));
@@ -117,6 +117,7 @@ public class TestApplicationMasterRestClient {
     assertEquals(samzaAppState.processorCount.get(),      
amMetricsGroup.get("container-count"));
     assertEquals(samzaAppState.jobHealthy.get() ? 1 : 0,  
amMetricsGroup.get("job-healthy"));
     assertEquals(0, amMetricsGroup.get("container-from-previous-attempt"));
+    assertEquals(0, amMetricsGroup.get("allocated-containers-in-buffer"));
   }
 
   @Test

Reply via email to