keith-turner commented on code in PR #4466: URL: https://github.com/apache/accumulo/pull/4466#discussion_r1569310633
########## test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java: ########## @@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception { shutdownTailer.set(true); thread.join(); } + + @Test + public void newTest() throws Exception { + // Metrics collector Thread + final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + + Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + thread.start(); + + long highestFileCount = 0L; + ServerContext context = getCluster().getServerContext(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + // Create splits so there are two groupings of tablets with similar file counts. + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + c.tableOperations().importDirectory(dir).to(tableName).load(); + + IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); + iterSetting.addOption("expectedQ", QUEUE1); + iterSetting.addOption("modulus", 3 + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); Review Comment: Remove this code and the test will pass. This forces a compaction regardless of what the compaction ratio is. ```suggestion ``` ########## test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java: ########## @@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception { shutdownTailer.set(true); thread.join(); } + + @Test + public void newTest() throws Exception { + // Metrics collector Thread + final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + + Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + thread.start(); + + long highestFileCount = 0L; + ServerContext context = getCluster().getServerContext(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + // Create splits so there are two groupings of tablets with similar file counts. + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + c.tableOperations().importDirectory(dir).to(tableName).load(); + + IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); + iterSetting.addOption("expectedQ", QUEUE1); + iterSetting.addOption("modulus", 3 + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); + + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { + // Get each tablet's file sizes + for (TabletMetadata tablet : tm) { + long fileSize = tablet.getFiles().size(); + log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); + highestFileCount = Math.max(highestFileCount, fileSize); + } + } + verifyData(c, tableName, 0, 100 * 100 - 1, false); + } + + boolean sawMetricsQ1 = false; + while (!sawMetricsQ1) { + while (!queueMetrics.isEmpty()) { + var qm = queueMetrics.take(); + if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + if (Integer.parseInt(qm.getValue()) > 0) { + sawMetricsQ1 = true; + } + } + } + // Current poll rate of the TestStatsDRegistryFactory is 3 seconds + // If metrics are not found in the queue, sleep until the next poll. + UtilWaitThread.sleep(3500); + } + + // Set lowest priority to the lowest possible system compaction priority + long lowestPriority = Short.MIN_VALUE; + long rejectedCount = 0L; + int queueSize = 0; + + boolean sawQueues = false; + // An empty queue means that the last known value is the most recent. + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + rejectedCount = Long.parseLong(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + queueSize = Integer.parseInt(metric.getValue()); + } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { + sawQueues = true; + } else { + log.debug("{}", metric); + } + } + + // Confirm metrics were generated and in some cases, validate contents. + assertTrue(rejectedCount > 0L); + + // Priority is the file counts + number of compactions for that tablet. + // The lowestPriority job in the queue should have been + // at least 1 count higher than the highest file count. + short highestFileCountPrio = CompactionJobPrioritizer.createPriority( + getCluster().getServerContext().getTableId(tableName), CompactionKind.USER, Review Comment: When removing the explicit table compaction that causes SYSTEM compactions to be queued instead of USER and those have a diff prio so need to change the following. ```suggestion getCluster().getServerContext().getTableId(tableName), CompactionKind.SYSTEM, ``` ########## test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java: ########## @@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception { shutdownTailer.set(true); thread.join(); } + + @Test + public void newTest() throws Exception { + // Metrics collector Thread + final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>(); Review Comment: At the beginning of this test or in the `@BeforeEach` function would be good to ensure compactors are stopped. This is needed if its possible that another test in the class could leave them running. I think this may be the case because this test class extends SharedMiniClusterBase. ########## test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java: ########## @@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception { shutdownTailer.set(true); thread.join(); } + + @Test + public void newTest() throws Exception { + // Metrics collector Thread + final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + + Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + thread.start(); + + long highestFileCount = 0L; + ServerContext context = getCluster().getServerContext(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + // Create splits so there are two groupings of tablets with similar file counts. + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + c.tableOperations().importDirectory(dir).to(tableName).load(); + + IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); + iterSetting.addOption("expectedQ", QUEUE1); + iterSetting.addOption("modulus", 3 + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); + + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { + // Get each tablet's file sizes + for (TabletMetadata tablet : tm) { + long fileSize = tablet.getFiles().size(); + log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); + highestFileCount = Math.max(highestFileCount, fileSize); + } + } + verifyData(c, tableName, 0, 100 * 100 - 1, false); + } + + boolean sawMetricsQ1 = false; + while (!sawMetricsQ1) { + while (!queueMetrics.isEmpty()) { + var qm = queueMetrics.take(); + if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + if (Integer.parseInt(qm.getValue()) > 0) { + sawMetricsQ1 = true; + } + } + } + // Current poll rate of the TestStatsDRegistryFactory is 3 seconds + // If metrics are not found in the queue, sleep until the next poll. + UtilWaitThread.sleep(3500); + } + + // Set lowest priority to the lowest possible system compaction priority + long lowestPriority = Short.MIN_VALUE; + long rejectedCount = 0L; + int queueSize = 0; + + boolean sawQueues = false; + // An empty queue means that the last known value is the most recent. + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + rejectedCount = Long.parseLong(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + queueSize = Integer.parseInt(metric.getValue()); + } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { + sawQueues = true; + } else { + log.debug("{}", metric); + } + } + + // Confirm metrics were generated and in some cases, validate contents. + assertTrue(rejectedCount > 0L); + + // Priority is the file counts + number of compactions for that tablet. + // The lowestPriority job in the queue should have been + // at least 1 count higher than the highest file count. + short highestFileCountPrio = CompactionJobPrioritizer.createPriority( + getCluster().getServerContext().getTableId(tableName), CompactionKind.USER, + (int) highestFileCount, 0); + assertTrue(lowestPriority > highestFileCountPrio, + lowestPriority + " " + highestFileCount + " " + highestFileCountPrio); + + // Multiple Queues have been created + assertTrue(sawQueues); + + // Queue size matches the intended queue size + assertEquals(QUEUE1_SIZE, queueSize); Review Comment: There could be race conditions where the manager sees a subset of the tablets and temporarily has a lower queue count. May want to restructure the code so that it loops waiting for the queue count to be 6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org