keith-turner commented on code in PR #5798:
URL: https://github.com/apache/accumulo/pull/5798#discussion_r2291909146
##########
test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java:
##########
@@ -232,6 +258,186 @@ public void confirmMetricsPublished() throws Exception {
cluster.stop();
}
+ @Test
+ public void testFateExecutorMetrics() throws Exception {
+ // Tests metrics for Fate's thread pools. Tests that metrics are seen as
expected, and config
+ // changes to the thread pools are accurately reflected in the metrics.
This includes checking
+ // that old thread pool metrics are removed, new ones are created, size
changes to thread
+ // pools are reflected, and the ops assigned and instance type tags are
seen as expected
+ final String table = getUniqueNames(1)[0];
+
+ // prevent any system initiated fate operations from running, which may
interfere with our
+ // metrics gathering
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+ try {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
+ client.tableOperations().create(table);
+
+ SortedSet<Text> splits = new TreeSet<>();
+ splits.add(new Text("foo"));
+ // initiate 1 USER fate op which will execute slowly
+ client.tableOperations().addSplits(table, splits);
+ // initiate 1 META fate op which will execute slowly
+ client.tableOperations().addSplits(SystemTables.METADATA.tableName(),
splits);
+
+ final AtomicBoolean sawExpectedInactiveUserThreads = new
AtomicBoolean(false);
+ final AtomicBoolean sawTotalThreadsUserMetric = new
AtomicBoolean(false);
+ final AtomicBoolean sawExpectedInactiveMetaThreads = new
AtomicBoolean(false);
+ final AtomicBoolean sawTotalThreadsMetaMetric = new
AtomicBoolean(false);
+ // should see 1 thread occupied for the pool for each FATE instance:
+ // inactive = configured size - num fate ops initiated
+ // total = configured size
+ Wait.waitFor(() -> {
+ for (var line : sink.getLines()) {
+ TestStatsDSink.Metric metric =
TestStatsDSink.parseStatsDMetric(line);
+ // if the metric is one of the fate executor metrics...
+ if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+ || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+ var tags = metric.getTags();
+ var instanceType = FateInstanceType
+
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+ var opsAssigned =
tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+ verifyFateMetricTags(opsAssigned, instanceType);
+
+ if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+ assertEquals(numFateThreadsPool1,
Integer.parseInt(metric.getValue()));
+ if (instanceType == FateInstanceType.USER) {
+ sawTotalThreadsUserMetric.set(true);
+ } else if (instanceType == FateInstanceType.META) {
+ sawTotalThreadsMetaMetric.set(true);
+ }
+ } else if
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+ if (instanceType == FateInstanceType.USER
+ && (Integer.parseInt(metric.getValue()) ==
numFateThreadsPool1 - 1)) {
+ sawExpectedInactiveUserThreads.set(true);
+ } else if (instanceType == FateInstanceType.META
+ && (Integer.parseInt(metric.getValue()) ==
numFateThreadsPool1 - 1)) {
+ sawExpectedInactiveMetaThreads.set(true);
+ }
+ }
+ }
+ }
+
+ log.debug("sawExpectedInactiveUserThreads: " +
sawExpectedInactiveUserThreads.get());
+ log.debug("sawExpectedInactiveMetaThreads: " +
sawExpectedInactiveMetaThreads.get());
+ log.debug("sawTotalThreadsUserMetric: " +
sawTotalThreadsUserMetric.get());
+ log.debug("sawTotalThreadsMetaMetric: " +
sawTotalThreadsMetaMetric.get());
+ return sawExpectedInactiveUserThreads.get() &&
sawExpectedInactiveMetaThreads.get()
+ && sawTotalThreadsUserMetric.get() &&
sawTotalThreadsMetaMetric.get();
+ });
+
+ // Now change the config from:
+ // {<all fate ops>: numFateThreadsPool1}
+ // ->
+ // {<all fate ops except split>: numFateThreadsPool2,
+ // <split operation>: numFateThreadsPool3}
+ changeFateConfig(client, FateInstanceType.USER);
+ changeFateConfig(client, FateInstanceType.META);
+
+ // Wait for config changes to get picked up by FATE. Can do so by
waiting for metrics to no
+ // longer include metrics of the first pool
+ Wait.waitFor(() -> {
+ boolean metricExistsForPool1 = false;
+ for (var line : sink.getLines()) {
Review Comment:
Maybe could combine the test code for looking for new stuff and absence of
old stuff. Like if we see new stuff and no old stuff two or three times in a
row, then its probably good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]