arjunashok commented on code in PR #249:
URL: https://github.com/apache/cassandra-sidecar/pull/249#discussion_r2320063833
##########
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java:
##########
@@ -256,4 +280,280 @@ void assertClientStatsResponse(HttpResponse<Buffer>
response, Map<String, Boolea
}
}
}
+
+ @Test
+ void testCompactionStatsRetrieval()
+ {
+ logger.info("Starting compaction stats test with {} tables",
COMPACTION_TEST_TABLES.size());
+
+ // Generate SSTables for all test tables
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ generateSSTables(tableName, 100);
+ }
+
+ // Create threads to trigger compaction on all tables
+ List<Thread> compactionThreads = new ArrayList<>();
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ Thread thread = new Thread(() ->
triggerCompactionForTable(tableName));
+ compactionThreads.add(thread);
+ }
+
+ // Start all compaction threads
+ for (Thread thread : compactionThreads)
+ {
+ thread.start();
+ }
+
+ // Poll immediately and repeatedly to catch active compactions
+ CompactionStatsResponse stats = null;
+ HttpResponse<Buffer> response;
+ boolean foundActiveCompactions;
+
+ for (int attempt = 0; attempt < MAX_POLL_ATTEMPTS; attempt++)
+ {
+ try
+ {
+ response = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
COMPACTION_STATS_ROUTE)
+ .send()
+ .expecting(HttpResponseExpectation.SC_OK));
+
+ stats = response.bodyAsJson(CompactionStatsResponse.class);
+ foundActiveCompactions = !stats.activeCompactions().isEmpty();
+
+ if (foundActiveCompactions)
+ {
+ logger.info("SUCCESS: Found {} active compactions on
attempt {}",
+ stats.activeCompactionsCount(), attempt + 1);
+ break;
+ }
+ else
+ {
+ logger.info("Attempt {}: No active compactions yet",
attempt + 1);
+ }
+
+ Thread.sleep(100); // Short sleep between attempts
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // Wait for all compaction threads to complete
+ for (Thread thread : compactionThreads)
+ {
+ try
+ {
+ thread.join(5000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ assertThat(stats).isNotNull();
+ logger.info("Response:{}", stats);
Review Comment:
Nit: Serialize
(`Response:org.apache.cassandra.sidecar.common.response.CompactionStatsResponse@2a9d6a9d`)
##########
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java:
##########
@@ -256,4 +280,280 @@ void assertClientStatsResponse(HttpResponse<Buffer>
response, Map<String, Boolea
}
}
}
+
+ @Test
+ void testCompactionStatsRetrieval()
+ {
+ logger.info("Starting compaction stats test with {} tables",
COMPACTION_TEST_TABLES.size());
+
+ // Generate SSTables for all test tables
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ generateSSTables(tableName, 100);
+ }
+
+ // Create threads to trigger compaction on all tables
+ List<Thread> compactionThreads = new ArrayList<>();
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ Thread thread = new Thread(() ->
triggerCompactionForTable(tableName));
+ compactionThreads.add(thread);
+ }
+
+ // Start all compaction threads
+ for (Thread thread : compactionThreads)
+ {
+ thread.start();
+ }
+
+ // Poll immediately and repeatedly to catch active compactions
+ CompactionStatsResponse stats = null;
+ HttpResponse<Buffer> response;
+ boolean foundActiveCompactions;
+
+ for (int attempt = 0; attempt < MAX_POLL_ATTEMPTS; attempt++)
+ {
+ try
+ {
+ response = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
COMPACTION_STATS_ROUTE)
+ .send()
+ .expecting(HttpResponseExpectation.SC_OK));
+
+ stats = response.bodyAsJson(CompactionStatsResponse.class);
+ foundActiveCompactions = !stats.activeCompactions().isEmpty();
+
+ if (foundActiveCompactions)
+ {
+ logger.info("SUCCESS: Found {} active compactions on
attempt {}",
+ stats.activeCompactionsCount(), attempt + 1);
+ break;
+ }
+ else
+ {
+ logger.info("Attempt {}: No active compactions yet",
attempt + 1);
+ }
+
+ Thread.sleep(100); // Short sleep between attempts
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // Wait for all compaction threads to complete
+ for (Thread thread : compactionThreads)
+ {
+ try
+ {
+ thread.join(5000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ assertThat(stats).isNotNull();
+ logger.info("Response:{}", stats);
+ validateCompactionStatsResponse(stats);
+ }
+
+
+ private void generateSSTables(QualifiedName tableName, int numSSTables)
+ {
+ for (int batch = 0; batch < numSSTables; batch++)
+ {
+ for (int i = batch * 1000; i < (batch + 1) * 1000; i++)
+ {
+ String statement = String.format("INSERT INTO %s (id, data)
VALUES (%d, '%s');",
+ tableName, i, "data" + i);
+ cluster.schemaChangeIgnoringStoppedInstances(statement);
+ }
+ cluster.stream().forEach(instance ->
instance.flush(TEST_KEYSPACE));
+ }
+ }
+
+ private void triggerCompactionForTable(QualifiedName tableName)
+ {
+ cluster.stream().forEach(instance ->
+ {
+ try
+ {
+ instance.nodetool("compact",
tableName.keyspace(), tableName.table());
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to trigger
compaction for {}: {}", tableName, e.getMessage());
+ }
+ });
+ }
+
+ private void validateCompactionStatsResponse(CompactionStatsResponse stats)
+ {
+ assertThat(stats).isNotNull();
+
+ // Basic counters validation
+ assertThat(stats.concurrentCompactors()).isGreaterThanOrEqualTo(0);
+ assertThat(stats.totalPendingTasks()).isGreaterThanOrEqualTo(0);
+ assertThat(stats.completedCompactions()).isGreaterThanOrEqualTo(0);
+ assertThat(stats.dataCompacted()).isGreaterThanOrEqualTo(0);
+ assertThat(stats.abortedCompactions()).isGreaterThanOrEqualTo(0);
+ assertThat(stats.reducedCompactions()).isGreaterThanOrEqualTo(0);
+
assertThat(stats.sstablesDroppedFromCompaction()).isGreaterThanOrEqualTo(0);
+
+ // Pending tasks validation
+ assertThat(stats.pendingTasks()).isNotNull();
+
+ // Validate each pending task entry if there are any
+ if (!stats.pendingTasks().isEmpty())
+ {
+ validatePendingTasks(stats);
+ }
+
+ // Completion rates validation
+ assertThat(stats.completedCompactionsRate()).isNotNull();
+
+ // Validate mean rate format is X.XX/hour
Review Comment:
Nit: Update comment to reflect the check
##########
adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionStatsOperations.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics;
+import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils;
+import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
+import org.apache.cassandra.sidecar.common.server.CompactionStatsOperations;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.data.ActiveCompactionEntryData;
+import org.apache.cassandra.sidecar.common.server.data.CompactionStatsData;
+import
org.apache.cassandra.sidecar.common.server.data.CompletedCompactionsRateData;
+
+import static
org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.BYTES_COMPACTED;
+import static
org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.COMPACTIONS_ABORTED;
+import static
org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.COMPACTIONS_REDUCED;
+import static
org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.PENDING_TASKS_BY_TABLE_NAME;
+import static
org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.SSTABLES_DROPPED_FROM_COMPACTION;
+import static
org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.TOTAL_COMPACTIONS_COMPLETED;
+import static
org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils.safeCast;
+import static
org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils.safeParseLong;
+
+/**
+ * Service responsible for fetching compaction stats from various sources
including
+ * CassandraStorageOperations, CassandraMetricsOperations, and
CompactionManagerOperations.
+ */
+public class CassandraCompactionStatsOperations implements
CompactionStatsOperations
+{
+ // Default values
+ // Default string value when actual value is unavailable
+ public static final String DEFAULT_STRING_VALUE = "";
+ // Default numeric value when actual value is unavailable
+ public static final String DEFAULT_NUMBER_VALUE = "-1";
+
+ // Constants for compaction info map keys
+ // Keyspace name being compacted
+ public static final String KEYSPACE = "keyspace";
+ // Column family (table) name being compacted
+ public static final String COLUMNFAMILY = "columnfamily";
+ // Number of bytes already processed in the compaction
+ public static final String COMPLETED = "completed";
+ // Total number of bytes to be processed in the compaction
+ public static final String TOTAL = "total";
+ // Type of compaction task (e.g., COMPACTION, VALIDATION, etc.)
+ public static final String TASK_TYPE = "taskType";
+ // Unique compaction identifier
+ public static final String COMPACTION_ID = "compactionId";
+ // Comma-separated list of SSTable names involved in the compaction
+ public static final String SSTABLES = "sstables";
+ // Directory where compaction output will be written
+ public static final String TARGET_DIRECTORY = "targetDirectory";
+
+ private final StorageOperations storageOperations;
+ private final MetricsOperations metricsOperations;
+ private final CompactionManagerOperations compactionManagerOperations;
+
+ public CassandraCompactionStatsOperations(StorageOperations
storageOperations,
+ MetricsOperations
metricsOperations,
+ CompactionManagerOperations
compactionManagerOperations)
+ {
+ this.storageOperations = storageOperations;
+ this.metricsOperations = metricsOperations;
+ this.compactionManagerOperations = compactionManagerOperations;
+ }
+
+ @Override
+ public CompactionStatsData compactionStats()
+ {
+ // Get concurrent compactors from StorageOperations
+ long concurrentCompactors =
storageOperations.getConcurrentCompactors();
+
+ // Get pending tasks grouped by keyspace and table
+ Map<String, Map<String, Integer>> pendingTasks =
getPendingCompactionTasksByTable();
+ long totalPendingTasks = pendingTasks.values().stream()
+ .mapToLong(tableMap ->
tableMap.values().stream().mapToInt(Integer::intValue).sum())
+ .sum();
+
+ // Get compaction metrics from JMX counters and meters
+ long completedCompactionsCount =
getCompactionMetricLong(TOTAL_COMPACTIONS_COMPLETED);
+ long dataCompactedBytes = getCompactionMetricLong(BYTES_COMPACTED);
+ long abortedCompactionsCount =
getCompactionMetricLong(COMPACTIONS_ABORTED);
+ long reducedCompactionsCount =
getCompactionMetricLong(COMPACTIONS_REDUCED);
+ long sstablesDroppedFromCompactionCount =
getCompactionMetricLong(SSTABLES_DROPPED_FROM_COMPACTION);
+
+ // Get completed compactions rate with proper time conversions
+ CompletedCompactionsRateData completedCompactionsRate =
metricsOperations.getCompletedCompactionsRate();
+
+ // Get active compactions with all required fields
+ List<ActiveCompactionEntryData> activeCompactions =
getActiveCompactions(compactionManagerOperations.getCompactions());
+ long activeCompactionsCount = activeCompactions.size();
+
+ // Calculate remaining time in seconds based on throughput and
remaining bytes
+ long activeCompactionsRemainingTime =
calculateRemainingTimeSeconds(activeCompactions);
+
+ return CompactionStatsData.builder()
+ .concurrentCompactors(concurrentCompactors)
+ .pendingTasks(pendingTasks)
+ .totalPendingTasks(totalPendingTasks)
+
.completedCompactions(completedCompactionsCount)
+ .dataCompacted(dataCompactedBytes)
+ .abortedCompactions(abortedCompactionsCount)
+ .reducedCompactions(reducedCompactionsCount)
+
.sstablesDroppedFromCompaction(sstablesDroppedFromCompactionCount)
+
.completedCompactionsRate(completedCompactionsRate)
+ .activeCompactions(activeCompactions)
+
.activeCompactionsCount(activeCompactionsCount)
+
.activeCompactionsRemainingTime(activeCompactionsRemainingTime)
+ .build();
+ }
+
+ private long getCompactionMetricLong(CompactionStatsMetrics metric)
Review Comment:
Nit: The return type is evident from the method signature and redundant in
the method name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]