sklaha commented on code in PR #249: URL: https://github.com/apache/cassandra-sidecar/pull/249#discussion_r2322711240
########## adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionStatsOperations.java: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics; +import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils; +import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations; +import org.apache.cassandra.sidecar.common.server.CompactionStatsOperations; +import org.apache.cassandra.sidecar.common.server.MetricsOperations; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.data.ActiveCompactionEntryData; +import org.apache.cassandra.sidecar.common.server.data.CompactionStatsData; +import org.apache.cassandra.sidecar.common.server.data.CompletedCompactionsRateData; + +import static org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.BYTES_COMPACTED; +import static org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.COMPACTIONS_ABORTED; +import static org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.COMPACTIONS_REDUCED; +import static org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.PENDING_TASKS_BY_TABLE_NAME; +import static org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.SSTABLES_DROPPED_FROM_COMPACTION; +import static org.apache.cassandra.sidecar.adapters.base.data.CompactionStatsMetrics.TOTAL_COMPACTIONS_COMPLETED; +import static org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils.safeCast; +import static org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils.safeParseLong; + +/** + * Service responsible for fetching compaction stats from various sources including + * CassandraStorageOperations, CassandraMetricsOperations, and CompactionManagerOperations. + */ +public class CassandraCompactionStatsOperations implements CompactionStatsOperations +{ + // Default values + // Default string value when actual value is unavailable + public static final String DEFAULT_STRING_VALUE = ""; + // Default numeric value when actual value is unavailable + public static final String DEFAULT_NUMBER_VALUE = "-1"; + + // Constants for compaction info map keys + // Keyspace name being compacted + public static final String KEYSPACE = "keyspace"; + // Column family (table) name being compacted + public static final String COLUMNFAMILY = "columnfamily"; + // Number of bytes already processed in the compaction + public static final String COMPLETED = "completed"; + // Total number of bytes to be processed in the compaction + public static final String TOTAL = "total"; + // Type of compaction task (e.g., COMPACTION, VALIDATION, etc.) + public static final String TASK_TYPE = "taskType"; + // Unique compaction identifier + public static final String COMPACTION_ID = "compactionId"; + // Comma-separated list of SSTable names involved in the compaction + public static final String SSTABLES = "sstables"; + // Directory where compaction output will be written + public static final String TARGET_DIRECTORY = "targetDirectory"; + + private final StorageOperations storageOperations; + private final MetricsOperations metricsOperations; + private final CompactionManagerOperations compactionManagerOperations; + + public CassandraCompactionStatsOperations(StorageOperations storageOperations, + MetricsOperations metricsOperations, + CompactionManagerOperations compactionManagerOperations) + { + this.storageOperations = storageOperations; + this.metricsOperations = metricsOperations; + this.compactionManagerOperations = compactionManagerOperations; + } + + @Override + public CompactionStatsData compactionStats() + { + // Get concurrent compactors from StorageOperations + long concurrentCompactors = storageOperations.getConcurrentCompactors(); + + // Get pending tasks grouped by keyspace and table + Map<String, Map<String, Integer>> pendingTasks = getPendingCompactionTasksByTable(); + long totalPendingTasks = pendingTasks.values().stream() + .mapToLong(tableMap -> tableMap.values().stream().mapToInt(Integer::intValue).sum()) + .sum(); + + // Get compaction metrics from JMX counters and meters + long completedCompactionsCount = getCompactionMetricLong(TOTAL_COMPACTIONS_COMPLETED); + long dataCompactedBytes = getCompactionMetricLong(BYTES_COMPACTED); + long abortedCompactionsCount = getCompactionMetricLong(COMPACTIONS_ABORTED); + long reducedCompactionsCount = getCompactionMetricLong(COMPACTIONS_REDUCED); + long sstablesDroppedFromCompactionCount = getCompactionMetricLong(SSTABLES_DROPPED_FROM_COMPACTION); + + // Get completed compactions rate with proper time conversions + CompletedCompactionsRateData completedCompactionsRate = metricsOperations.getCompletedCompactionsRate(); + + // Get active compactions with all required fields + List<ActiveCompactionEntryData> activeCompactions = getActiveCompactions(compactionManagerOperations.getCompactions()); + long activeCompactionsCount = activeCompactions.size(); + + // Calculate remaining time in seconds based on throughput and remaining bytes + long activeCompactionsRemainingTime = calculateRemainingTimeSeconds(activeCompactions); + + return CompactionStatsData.builder() + .concurrentCompactors(concurrentCompactors) + .pendingTasks(pendingTasks) + .totalPendingTasks(totalPendingTasks) + .completedCompactions(completedCompactionsCount) + .dataCompacted(dataCompactedBytes) + .abortedCompactions(abortedCompactionsCount) + .reducedCompactions(reducedCompactionsCount) + .sstablesDroppedFromCompaction(sstablesDroppedFromCompactionCount) + .completedCompactionsRate(completedCompactionsRate) + .activeCompactions(activeCompactions) + .activeCompactionsCount(activeCompactionsCount) + .activeCompactionsRemainingTime(activeCompactionsRemainingTime) + .build(); + } + + private long getCompactionMetricLong(CompactionStatsMetrics metric) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

