Save compaction history to system keyspace patch by lantao yan; reviewed by yukim for CASSANDRA-5078
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2b12784 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2b12784 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2b12784 Branch: refs/heads/trunk Commit: a2b12784fe3785fe96d9c0e2d7e8c72bfc88ac7c Parents: 01a57ee Author: lantao yan <yanlan...@hotmail.com> Authored: Mon Oct 7 15:22:11 2013 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Oct 7 15:30:50 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + .../org/apache/cassandra/config/CFMetaData.java | 11 +++ .../org/apache/cassandra/config/KSMetaData.java | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 25 ++++++ .../CompactionHistoryTabularData.java | 84 ++++++++++++++++++++ .../db/compaction/CompactionManager.java | 14 ++++ .../db/compaction/CompactionManagerMBean.java | 4 + .../cassandra/db/compaction/CompactionTask.java | 52 ++++++------ .../org/apache/cassandra/tools/NodeCmd.java | 26 ++++++ .../org/apache/cassandra/tools/NodeProbe.java | 6 ++ .../apache/cassandra/tools/NodeToolHelp.yaml | 3 + 12 files changed, 204 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ddd976e..ee631a0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ * Allow alter keyspace on system_traces (CASSANDRA-6016) * Disallow empty column names in cql (CASSANDRA-6136) * Use Java7 file-handling APIs and fix file moving on Windows (CASSANDRA-5383) + * Save compaction history to system keyspace (CASSANDRA-5078) Merged from 1.2: * Limit CQL prepared statement cache by size instead of count (CASSANDRA-6107) * Tracing should log write failure rather than raw exceptions (CASSANDRA-6133) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 6ed8449..37fbae7 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -23,6 +23,9 @@ New features (See blog post at TODO) - Configurable metrics reporting (see conf/metrics-reporter-config-sample.yaml) + - Compaction history and stats are now saved to system keyspace + (system.compaction_history table). You can access historiy via + new 'nodetool compactionhistory' command or CQL. Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 8c4075c..bbea21e 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -268,6 +268,17 @@ public final class CFMetaData + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation))" + ") WITH COMMENT='historic sstable read rates'"); + public static final CFMetaData CompactionHistoryCf = compile("CREATE TABLE " + SystemKeyspace.COMPACTION_HISTORY_CF + " (" + + "id uuid," + + "keyspace_name text," + + "columnfamily_name text," + + "compacted_at timestamp," + + "bytes_in bigint," + + "bytes_out bigint," + + "rows_merged map<int, bigint>," + + "PRIMARY KEY (id)" + + ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800"); + public enum Caching { ALL, KEYS_ONLY, ROWS_ONLY, NONE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index 20ecda3..0a32f5c 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -90,6 +90,7 @@ public final class KSMetaData CFMetaData.SchemaColumnFamiliesCf, CFMetaData.SchemaColumnsCf, CFMetaData.CompactionLogCf, + CFMetaData.CompactionHistoryCf, CFMetaData.PaxosCf, CFMetaData.SSTableActivityCF); return new KSMetaData(Keyspace.SYSTEM_KS, LocalStrategy.class, Collections.<String, String>emptyMap(), true, cfDefs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 1d5927a..50af82d 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import javax.management.openmbean.*; import com.google.common.base.Function; import com.google.common.collect.HashMultimap; @@ -29,6 +30,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; +import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.transport.Server; import org.apache.commons.lang3.StringUtils; @@ -80,6 +82,7 @@ public class SystemKeyspace public static final String COMPACTION_LOG = "compactions_in_progress"; public static final String PAXOS_CF = "paxos"; public static final String SSTABLE_ACTIVITY_CF = "sstable_activity"; + public static final String COMPACTION_HISTORY_CF = "compaction_history"; private static final String LOCAL_KEY = "local"; private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local"); @@ -220,6 +223,28 @@ public class SystemKeyspace compactionLog.truncateBlocking(); } + public static void updateCompactionHistory(String ksname, + String cfname, + long compactedAt, + long bytesIn, + long bytesOut, + Map<Integer, Long> rowsMerged) + { + // don't write anything when the history table itself is compacted, since that would in turn cause new compactions + if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_CF)) + return; + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) " + + "VALUES (%s, '%s', '%s', %d, %d, %d, {%s})"; + processInternal(String.format(req, COMPACTION_HISTORY_CF, UUIDGen.getTimeUUID().toString(), ksname, cfname, compactedAt, bytesIn, bytesOut, FBUtilities.toString(rowsMerged))); + forceBlockingFlush(COMPACTION_HISTORY_CF); + } + + public static TabularData getCompactionHistory() throws OpenDataException + { + UntypedResultSet queryResultSet = processInternal("SELECT * from system.compaction_history"); + return CompactionHistoryTabularData.from(queryResultSet); + } + public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java new file mode 100644 index 0000000..be64d44 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import javax.management.openmbean.*; +import java.util.Map; +import java.util.UUID; + +import com.google.common.base.Throwables; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.utils.FBUtilities; + +public class CompactionHistoryTabularData +{ + private static final String[] ITEM_NAMES = new String[]{ "id", "keyspace_name", "columnfamily_name", "compacted_at", + "bytes_in", "bytes_out", "rows_merged" }; + + private static final String[] ITEM_DESCS = new String[]{ "time uuid", "keyspace name", + "column family name", "compaction finished at", + "total bytes in", "total bytes out", "total rows merged" }; + + private static final String TYPE_NAME = "CompactionHistory"; + + private static final String ROW_DESC = "CompactionHistory"; + + private static final OpenType<?>[] ITEM_TYPES; + + private static final CompositeType COMPOSITE_TYPE; + + private static final TabularType TABULAR_TYPE; + + static { + try + { + ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; + + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); + + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static TabularData from(UntypedResultSet resultSet) throws OpenDataException + { + TabularDataSupport result = new TabularDataSupport(TABULAR_TYPE); + for (UntypedResultSet.Row row : resultSet) + { + UUID id = row.getUUID(ITEM_NAMES[0]); + String ksName = row.getString(ITEM_NAMES[1]); + String cfName = row.getString(ITEM_NAMES[2]); + long compactedAt = row.getLong(ITEM_NAMES[3]); + long bytesIn = row.getLong(ITEM_NAMES[4]); + long bytesOut = row.getLong(ITEM_NAMES[5]); + Map<Integer, Long> rowMerged = row.getMap(ITEM_NAMES[6], Int32Type.instance, LongType.instance); + + result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES, + new Object[]{ id.toString(), ksName, cfName, compactedAt, bytesIn, bytesOut, + "{" + FBUtilities.toString(rowMerged) + "}" })); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 317014f..bcf9422 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -24,6 +24,8 @@ import java.util.*; import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; import com.google.common.base.Throwables; import com.google.common.collect.*; @@ -1016,6 +1018,18 @@ public class CompactionManager implements CompactionManagerMBean return out; } + public TabularData getCompactionHistory() + { + try + { + return SystemKeyspace.getCompactionHistory(); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + public long getTotalBytesCompacted() { return metrics.bytesCompacted.count(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java index 25f7c32..acf1e52 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.List; import java.util.Map; +import javax.management.openmbean.TabularData; public interface CompactionManagerMBean { @@ -28,6 +29,9 @@ public interface CompactionManagerMBean /** List of running compaction summary strings. */ public List<String> getCompactionSummary(); + /** compaction history **/ + public TabularData getCompactionHistory(); + /** * @see org.apache.cassandra.metrics.CompactionMetrics#pendingTasks * @return estimated number of compactions remaining to perform http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index c9889cc..0b2cb54 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -244,34 +244,34 @@ public class CompactionTask extends AbstractCompactionTask for (SSTableReader sstable : sstables) sstable.preheat(cachedKeyMap.get(sstable.descriptor)); - if (logger.isInfoEnabled()) + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTable.getTotalBytes(toCompact); + long endsize = SSTable.getTotalBytes(sstables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder builder = new StringBuilder(); + for (SSTableReader reader : sstables) + builder.append(reader.descriptor.baseFilename()).append(","); + + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + long[] counts = ci.getMergedRowCounts(); + StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + Map<Integer, Long> mergedRows = new HashMap<Integer, Long>(); + for (int i = 0; i < counts.length; i++) { - // log a bunch of statistics about the result - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTable.getTotalBytes(toCompact); - long endsize = SSTable.getTotalBytes(sstables); - double ratio = (double)endsize / (double)startsize; - - StringBuilder builder = new StringBuilder(); - for (SSTableReader reader : sstables) - builder.append(reader.descriptor.baseFilename()).append(","); - - double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0; - long totalSourceRows = 0; - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); - for (int i = 0; i < counts.length; i++) - { - int rows = i + 1; - long count = counts[i]; - totalSourceRows += rows * count; - mergeSummary.append(String.format("%d:%d, ", rows, count)); - } - - logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total rows, %,d unique. Row merge counts were {%s}", - toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString())); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + int rows = i + 1; + long count = counts[i]; + totalSourceRows += rows * count; + mergeSummary.append(String.format("%d:%d, ", rows, count)); + mergedRows.put(rows, count); } + + SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, start, startsize, endsize, mergedRows); + logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total rows, %,d unique. Row merge counts were {%s}", + toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString())); + logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); } private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 657d7f2..d47a4a3 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -27,6 +27,7 @@ import java.text.SimpleDateFormat; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; +import javax.management.openmbean.TabularData; import com.google.common.base.Joiner; import com.google.common.collect.LinkedHashMultimap; @@ -109,6 +110,7 @@ public class NodeCmd CLEARSNAPSHOT, COMPACT, COMPACTIONSTATS, + COMPACTIONHISTORY, DECOMMISSION, DESCRIBECLUSTER, DISABLEBINARY, @@ -1085,6 +1087,7 @@ public class NodeCmd case TPSTATS : nodeCmd.printThreadPoolStats(System.out); break; case VERSION : nodeCmd.printReleaseVersion(System.out); break; case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break; + case COMPACTIONHISTORY:nodeCmd.printCompactionHistory(System.out); break; case DESCRIBECLUSTER : nodeCmd.printClusterDescription(System.out, host); break; case DISABLEBINARY : probe.stopNativeTransport(); break; case ENABLEBINARY : probe.startNativeTransport(); break; @@ -1304,6 +1307,29 @@ public class NodeCmd System.exit(probe.isFailed() ? 1 : 0); } + private void printCompactionHistory(PrintStream out) + { + out.println("Compaction History: "); + + TabularData tabularData = this.probe.compactionHistory(); + if (tabularData.isEmpty()) + { + out.printf("There is no compaction history"); + return; + } + + String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n"; + List<String> indexNames = tabularData.getTabularType().getIndexNames(); + out.printf(format, indexNames.toArray(new String[indexNames.size()])); + + Set<?> values = tabularData.keySet(); + for (Object eachValue : values) + { + List<?> value = (List<?>) eachValue; + out.printf(format, value.toArray(new Object[value.size()])); + } + } + private static void printHistory(String[] args, ToolCommandLine cmd) { //don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 15f837f..f05cfb7 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -36,6 +36,7 @@ import javax.management.openmbean.CompositeData; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import javax.management.openmbean.TabularData; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -864,6 +865,11 @@ public class NodeProbe { return spProxy.getReadRepairRepairedBackground(); } + + public TabularData compactionHistory() + { + return compactionProxy.getCompactionHistory(); + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml index 4e8a8a0..165a174 100644 --- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml +++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml @@ -49,6 +49,9 @@ commands: - name: compactionstats help: | Print statistics on compactions + - name: compactionhistory + help: | + Print history of compaction - name: disablebinary help: | Disable native transport (binary protocol)