keith-turner commented on code in PR #4466:
URL: https://github.com/apache/accumulo/pull/4466#discussion_r1569310633


##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  @Test
+  public void newTest() throws Exception {
+    // Metrics collector Thread
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (!shutdownTailer.get()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            break;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    long highestFileCount = 0L;
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      IteratorSetting iterSetting = new IteratorSetting(100, 
CompactionIT.TestFilter.class);
+      iterSetting.addOption("expectedQ", QUEUE1);
+      iterSetting.addOption("modulus", 3 + "");
+      CompactionConfig config =
+          new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(false);
+      c.tableOperations().compact(tableName, config);

Review Comment:
   Remove this code and the test will pass.  This forces a compaction 
regardless of what the compaction ratio is.
   
   ```suggestion
   ```



##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  @Test
+  public void newTest() throws Exception {
+    // Metrics collector Thread
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (!shutdownTailer.get()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            break;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    long highestFileCount = 0L;
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      IteratorSetting iterSetting = new IteratorSetting(100, 
CompactionIT.TestFilter.class);
+      iterSetting.addOption("expectedQ", QUEUE1);
+      iterSetting.addOption("modulus", 3 + "");
+      CompactionConfig config =
+          new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(false);
+      c.tableOperations().compact(tableName, config);
+
+      try (TabletsMetadata tm = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        // Get each tablet's file sizes
+        for (TabletMetadata tablet : tm) {
+          long fileSize = tablet.getFiles().size();
+          log.info("Number of files in tablet {}: {}", 
tablet.getExtent().toString(), fileSize);
+          highestFileCount = Math.max(highestFileCount, fileSize);
+        }
+      }
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
+    }
+
+    boolean sawMetricsQ1 = false;
+    while (!sawMetricsQ1) {
+      while (!queueMetrics.isEmpty()) {
+        var qm = queueMetrics.take();
+        if 
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(qm.getValue()) > 0) {
+            sawMetricsQ1 = true;
+          }
+        }
+      }
+      // Current poll rate of the TestStatsDRegistryFactory is 3 seconds
+      // If metrics are not found in the queue, sleep until the next poll.
+      UtilWaitThread.sleep(3500);
+    }
+
+    // Set lowest priority to the lowest possible system compaction priority
+    long lowestPriority = Short.MIN_VALUE;
+    long rejectedCount = 0L;
+    int queueSize = 0;
+
+    boolean sawQueues = false;
+    // An empty queue means that the last known value is the most recent.
+    while (!queueMetrics.isEmpty()) {
+      var metric = queueMetrics.take();
+      if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        rejectedCount = Long.parseLong(metric.getValue());
+      } else if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        lowestPriority = Math.max(lowestPriority, 
Long.parseLong(metric.getValue()));
+      } else if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        queueSize = Integer.parseInt(metric.getValue());
+      } else if 
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES))
 {
+        sawQueues = true;
+      } else {
+        log.debug("{}", metric);
+      }
+    }
+
+    // Confirm metrics were generated and in some cases, validate contents.
+    assertTrue(rejectedCount > 0L);
+
+    // Priority is the file counts + number of compactions for that tablet.
+    // The lowestPriority job in the queue should have been
+    // at least 1 count higher than the highest file count.
+    short highestFileCountPrio = CompactionJobPrioritizer.createPriority(
+        getCluster().getServerContext().getTableId(tableName), 
CompactionKind.USER,

Review Comment:
   When removing the explicit table compaction that causes SYSTEM compactions 
to be queued instead of USER and those have a diff prio so need to change the 
following.
   
   ```suggestion
           getCluster().getServerContext().getTableId(tableName), 
CompactionKind.SYSTEM,
   ```



##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  @Test
+  public void newTest() throws Exception {
+    // Metrics collector Thread
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();

Review Comment:
   At the beginning of this test or in the `@BeforeEach` function would be good 
to ensure compactors are stopped. This is  needed if its possible that another 
test in the class could leave them running.  I think this may be the case 
because this test class extends SharedMiniClusterBase.



##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  @Test
+  public void newTest() throws Exception {
+    // Metrics collector Thread
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (!shutdownTailer.get()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            break;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    long highestFileCount = 0L;
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      IteratorSetting iterSetting = new IteratorSetting(100, 
CompactionIT.TestFilter.class);
+      iterSetting.addOption("expectedQ", QUEUE1);
+      iterSetting.addOption("modulus", 3 + "");
+      CompactionConfig config =
+          new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(false);
+      c.tableOperations().compact(tableName, config);
+
+      try (TabletsMetadata tm = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        // Get each tablet's file sizes
+        for (TabletMetadata tablet : tm) {
+          long fileSize = tablet.getFiles().size();
+          log.info("Number of files in tablet {}: {}", 
tablet.getExtent().toString(), fileSize);
+          highestFileCount = Math.max(highestFileCount, fileSize);
+        }
+      }
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
+    }
+
+    boolean sawMetricsQ1 = false;
+    while (!sawMetricsQ1) {
+      while (!queueMetrics.isEmpty()) {
+        var qm = queueMetrics.take();
+        if 
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(qm.getValue()) > 0) {
+            sawMetricsQ1 = true;
+          }
+        }
+      }
+      // Current poll rate of the TestStatsDRegistryFactory is 3 seconds
+      // If metrics are not found in the queue, sleep until the next poll.
+      UtilWaitThread.sleep(3500);
+    }
+
+    // Set lowest priority to the lowest possible system compaction priority
+    long lowestPriority = Short.MIN_VALUE;
+    long rejectedCount = 0L;
+    int queueSize = 0;
+
+    boolean sawQueues = false;
+    // An empty queue means that the last known value is the most recent.
+    while (!queueMetrics.isEmpty()) {
+      var metric = queueMetrics.take();
+      if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        rejectedCount = Long.parseLong(metric.getValue());
+      } else if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        lowestPriority = Math.max(lowestPriority, 
Long.parseLong(metric.getValue()));
+      } else if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        queueSize = Integer.parseInt(metric.getValue());
+      } else if 
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES))
 {
+        sawQueues = true;
+      } else {
+        log.debug("{}", metric);
+      }
+    }
+
+    // Confirm metrics were generated and in some cases, validate contents.
+    assertTrue(rejectedCount > 0L);
+
+    // Priority is the file counts + number of compactions for that tablet.
+    // The lowestPriority job in the queue should have been
+    // at least 1 count higher than the highest file count.
+    short highestFileCountPrio = CompactionJobPrioritizer.createPriority(
+        getCluster().getServerContext().getTableId(tableName), 
CompactionKind.USER,
+        (int) highestFileCount, 0);
+    assertTrue(lowestPriority > highestFileCountPrio,
+        lowestPriority + " " + highestFileCount + " " + highestFileCountPrio);
+
+    // Multiple Queues have been created
+    assertTrue(sawQueues);
+
+    // Queue size matches the intended queue size
+    assertEquals(QUEUE1_SIZE, queueSize);

Review Comment:
   There could be race conditions where the manager sees a subset of the 
tablets and temporarily has a lower queue count.  May want to restructure the 
code so that it loops waiting for the queue count to be 6.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to