This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 7dee9977a Add new SamzaApplicationMaster metric to track allocated
containers buffered in AM (#1677)
7dee9977a is described below
commit 7dee9977a4cc689d80376c1f53817af90aa5accb
Author: jia-gao <[email protected]>
AuthorDate: Tue Jul 18 13:13:59 2023 -0700
Add new SamzaApplicationMaster metric to track allocated containers
buffered in AM (#1677)
* Add new SamzaApplicationMaster metric to track containers allocated by RM
and buffered in AM
* update TestApplicationMasterRestClient
* Add allocated-containers-in-buffer in metrics doc
---
.../versioned/container/metrics-table.html | 4 +++
.../samza/job/yarn/YarnClusterResourceManager.java | 2 ++
.../samza/job/yarn/SamzaAppMasterMetrics.scala | 9 +++++
.../job/yarn/TestYarnClusterResourceManager.java | 41 ++++++++++++++++++++++
.../webapp/TestApplicationMasterRestClient.java | 3 +-
5 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html
b/docs/learn/documentation/versioned/container/metrics-table.html
index e130fb085..682947392 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -523,6 +523,10 @@
<td>container-from-previous-attempt</td>
<td>Number of containers carried from previous attempt in YARN. The
metrics is applicable only when Application Master High Availability is
enabled</td>
</tr>
+ <tr>
+ <td>allocated-containers-in-buffer</td>
+ <td>Number of containers allocated by the RM and buffered in the
AM</td>
+ </tr>
<tr>
<th colspan="2" class="section"
id="kafka-system-consumer-metrics">org.apache.samza.system.kafka.KafkaSystemConsumerMetrics</th>
diff --git
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index c07a45b9a..2436a8150 100644
---
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -298,6 +298,7 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
}
amClient.releaseAssignedContainer(container.getId());
allocatedResources.remove(resource);
+ metrics.decrementAllocatedContainersInBuffer();
}
}
@@ -508,6 +509,7 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
SamzaResource resource = new SamzaResource(numCores, memory, host,
containerId);
allocatedResources.put(resource, container);
resources.add(resource);
+ metrics.incrementAllocatedContainersInBuffer();
}
clusterManagerCallback.onResourcesAvailable(resources);
}
diff --git
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index fb540f77f..7f125964c 100644
---
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -42,6 +42,7 @@ class SamzaAppMasterMetrics(val config: Config,
private val metricsConfig = new MetricsConfig(config)
val containersFromPreviousAttempts =
newGauge("container-from-previous-attempt", 0L)
+ val allocatedContainersInBuffer = newGauge("allocated-containers-in-buffer",
0L)
val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig,
SamzaAppMasterMetrics.sourceName).asScala
reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName,
registry))
@@ -49,6 +50,14 @@ class SamzaAppMasterMetrics(val config: Config,
containersFromPreviousAttempts.set(containerCount)
}
+ def incrementAllocatedContainersInBuffer(): Unit = {
+ allocatedContainersInBuffer.set(allocatedContainersInBuffer.getValue + 1)
+ }
+
+ def decrementAllocatedContainersInBuffer(): Unit = {
+ allocatedContainersInBuffer.set(allocatedContainersInBuffer.getValue - 1)
+ }
+
def start() {
val mRunningContainers = newGauge("running-containers", () =>
state.runningProcessors.size)
val mNeededContainers = newGauge("needed-containers", () =>
state.neededProcessors.get())
diff --git
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index c7ae46f97..b17ccdb61 100644
---
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.job.yarn;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
@@ -250,4 +251,44 @@ public class TestYarnClusterResourceManager {
verify(asyncNMClient, times(1)).stopContainerAsync(runningContainerId,
runningNodeId);
}
+
+ @Test
+ public void testIncrementAllocatedContainersInBuffer() {
+ YarnClusterResourceManager yarnClusterResourceManager =
+ new YarnClusterResourceManager(asyncClient, asyncNMClient, callback,
yarnAppState, lifecycle, service, metrics,
+ yarnConfiguration, config);
+ yarnClusterResourceManager.start();
+ Container allocatedContainer = mock(Container.class);
+ ContainerId allocatedContainerId = mock(ContainerId.class);
+ NodeId allocatedNodeId = mock(NodeId.class);
+ Resource resource = mock(Resource.class);
+
+ when(allocatedNodeId.getHost()).thenReturn("fake_host");
+ when(resource.getVirtualCores()).thenReturn(1);
+ when(resource.getMemory()).thenReturn(1024);
+ when(allocatedContainer.getId()).thenReturn(allocatedContainerId);
+ when(allocatedContainer.getNodeId()).thenReturn(allocatedNodeId);
+ when(allocatedContainer.getResource()).thenReturn(resource);
+
yarnClusterResourceManager.onContainersAllocated(ImmutableList.of(allocatedContainer));
+ verify(metrics).incrementAllocatedContainersInBuffer();
+ }
+ @Test
+ public void testDecrementAllocatedContainersInBuffer() {
+ YarnClusterResourceManager yarnClusterResourceManager =
+ new YarnClusterResourceManager(asyncClient, asyncNMClient, callback,
yarnAppState, lifecycle, service, metrics,
+ yarnConfiguration, config);
+ yarnClusterResourceManager.start();
+ SamzaResource allocatedContainerResource = mock(SamzaResource.class);
+ Container runningContainer = mock(Container.class);
+ ContainerId runningContainerId = mock(ContainerId.class);
+ NodeId runningNodeId = mock(NodeId.class);
+
+ when(runningContainer.getId()).thenReturn(runningContainerId);
+ when(runningContainer.getNodeId()).thenReturn(runningNodeId);
+
+
yarnClusterResourceManager.getAllocatedResources().put(allocatedContainerResource,
runningContainer);
+ yarnClusterResourceManager.releaseResources(allocatedContainerResource);
+ verify(metrics).decrementAllocatedContainersInBuffer();
+ }
+
}
\ No newline at end of file
diff --git
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
index f60f232e6..b3c0affea 100644
---
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
+++
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
@@ -108,7 +108,7 @@ public class TestApplicationMasterRestClient {
assertTrue(metricsResult.containsKey(group));
Map<String, Object> amMetricsGroup = metricsResult.get(group);
- assertEquals(8, amMetricsGroup.size());
+ assertEquals(9, amMetricsGroup.size());
assertEquals(samzaAppState.runningProcessors.size(),
amMetricsGroup.get("running-containers"));
assertEquals(samzaAppState.neededProcessors.get(),
amMetricsGroup.get("needed-containers"));
assertEquals(samzaAppState.completedProcessors.get(),
amMetricsGroup.get("completed-containers"));
@@ -117,6 +117,7 @@ public class TestApplicationMasterRestClient {
assertEquals(samzaAppState.processorCount.get(),
amMetricsGroup.get("container-count"));
assertEquals(samzaAppState.jobHealthy.get() ? 1 : 0,
amMetricsGroup.get("job-healthy"));
assertEquals(0, amMetricsGroup.get("container-from-previous-attempt"));
+ assertEquals(0, amMetricsGroup.get("allocated-containers-in-buffer"));
}
@Test