keith-turner commented on code in PR #5798:
URL: https://github.com/apache/accumulo/pull/5798#discussion_r2291909146


##########
test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java:
##########
@@ -232,6 +258,186 @@ public void confirmMetricsPublished() throws Exception {
     cluster.stop();
   }
 
+  @Test
+  public void testFateExecutorMetrics() throws Exception {
+    // Tests metrics for Fate's thread pools. Tests that metrics are seen as 
expected, and config
+    // changes to the thread pools are accurately reflected in the metrics. 
This includes checking
+    // that old thread pool metrics are removed, new ones are created, size 
changes to thread
+    // pools are reflected, and the ops assigned and instance type tags are 
seen as expected
+    final String table = getUniqueNames(1)[0];
+
+    // prevent any system initiated fate operations from running, which may 
interfere with our
+    // metrics gathering
+    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+    
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    try {
+      try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+        client.tableOperations().create(table);
+
+        SortedSet<Text> splits = new TreeSet<>();
+        splits.add(new Text("foo"));
+        // initiate 1 USER fate op which will execute slowly
+        client.tableOperations().addSplits(table, splits);
+        // initiate 1 META fate op which will execute slowly
+        client.tableOperations().addSplits(SystemTables.METADATA.tableName(), 
splits);
+
+        final AtomicBoolean sawExpectedInactiveUserThreads = new 
AtomicBoolean(false);
+        final AtomicBoolean sawTotalThreadsUserMetric = new 
AtomicBoolean(false);
+        final AtomicBoolean sawExpectedInactiveMetaThreads = new 
AtomicBoolean(false);
+        final AtomicBoolean sawTotalThreadsMetaMetric = new 
AtomicBoolean(false);
+        // should see 1 thread occupied for the pool for each FATE instance:
+        // inactive = configured size - num fate ops initiated
+        // total = configured size
+        Wait.waitFor(() -> {
+          for (var line : sink.getLines()) {
+            TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+            // if the metric is one of the fate executor metrics...
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+                || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              var tags = metric.getTags();
+              var instanceType = FateInstanceType
+                  
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+              var opsAssigned = 
tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+              verifyFateMetricTags(opsAssigned, instanceType);
+
+              if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+                assertEquals(numFateThreadsPool1, 
Integer.parseInt(metric.getValue()));
+                if (instanceType == FateInstanceType.USER) {
+                  sawTotalThreadsUserMetric.set(true);
+                } else if (instanceType == FateInstanceType.META) {
+                  sawTotalThreadsMetaMetric.set(true);
+                }
+              } else if 
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+                if (instanceType == FateInstanceType.USER
+                    && (Integer.parseInt(metric.getValue()) == 
numFateThreadsPool1 - 1)) {
+                  sawExpectedInactiveUserThreads.set(true);
+                } else if (instanceType == FateInstanceType.META
+                    && (Integer.parseInt(metric.getValue()) == 
numFateThreadsPool1 - 1)) {
+                  sawExpectedInactiveMetaThreads.set(true);
+                }
+              }
+            }
+          }
+
+          log.debug("sawExpectedInactiveUserThreads: " + 
sawExpectedInactiveUserThreads.get());
+          log.debug("sawExpectedInactiveMetaThreads: " + 
sawExpectedInactiveMetaThreads.get());
+          log.debug("sawTotalThreadsUserMetric: " + 
sawTotalThreadsUserMetric.get());
+          log.debug("sawTotalThreadsMetaMetric: " + 
sawTotalThreadsMetaMetric.get());
+          return sawExpectedInactiveUserThreads.get() && 
sawExpectedInactiveMetaThreads.get()
+              && sawTotalThreadsUserMetric.get() && 
sawTotalThreadsMetaMetric.get();
+        });
+
+        // Now change the config from:
+        // {<all fate ops>: numFateThreadsPool1}
+        // ->
+        // {<all fate ops except split>: numFateThreadsPool2,
+        // <split operation>: numFateThreadsPool3}
+        changeFateConfig(client, FateInstanceType.USER);
+        changeFateConfig(client, FateInstanceType.META);
+
+        // Wait for config changes to get picked up by FATE. Can do so by 
waiting for metrics to no
+        // longer include metrics of the first pool
+        Wait.waitFor(() -> {
+          boolean metricExistsForPool1 = false;
+          for (var line : sink.getLines()) {

Review Comment:
   Maybe could combine the test code for looking for new stuff and absence of 
old stuff.  Like if we see new stuff and no old stuff two or three times in a 
row, then its probably good.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to