Repository: cassandra Updated Branches: refs/heads/trunk 0fa19b7ce -> e70959dea
Write partition size estimates into a system table patch by Aleksey Yeschenko; reviewed by Piotr KoÅaczkowski for CASSANDRA-7688 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e60089db Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e60089db Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e60089db Branch: refs/heads/trunk Commit: e60089db08c7675dd507aa668bff862d437382d0 Parents: 9014498 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Feb 5 16:06:56 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Feb 5 16:06:56 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 10 ++ .../org/apache/cassandra/config/KSMetaData.java | 3 +- .../cassandra/db/SizeEstimatesRecorder.java | 121 +++++++++++++++++++ .../org/apache/cassandra/db/SystemKeyspace.java | 42 +++++++ .../cassandra/service/CassandraDaemon.java | 12 +- 6 files changed, 182 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 192939b..959a2de 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Write partition size estimates into a system table (CASSANDRA-7688) * Upgrade libthrift to 0.9.2 (CASSANDRA-8685) * Don't use the shared ref in sstableloader (CASSANDRA-8704) * Purge internal prepared statements if related tables or http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index e75abb7..d55d1c0 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -290,6 +290,16 @@ public final class CFMetaData + "PRIMARY KEY (id)" + ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800"); + public static final CFMetaData SizeEstimatesCf = compile("CREATE TABLE " + SystemKeyspace.SIZE_ESTIMATES_CF + " (" + + "keyspace_name text," + + "table_name text," + + "range_start text," + + "range_end text," + + "mean_partition_size bigint," + + "partitions_count bigint," + + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end)" + + ") WITH COMMENT='per-table primary range size estimates'"); + public static class SpeculativeRetry { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index 8c99191..22c59ca 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -104,7 +104,8 @@ public final class KSMetaData CFMetaData.CompactionLogCf, CFMetaData.CompactionHistoryCf, CFMetaData.PaxosCf, - CFMetaData.SSTableActivityCF); + CFMetaData.SSTableActivityCF, + CFMetaData.SizeEstimatesCf); return new KSMetaData(Keyspace.SYSTEM_KS, LocalStrategy.class, Collections.<String, String>emptyMap(), true, cfDefs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java new file mode 100644 index 0000000..b739ba5 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.service.MigrationListener; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Pair; + +/** + * A very simplistic/crude partition count/size estimator. + * + * Exposing per-primary-range estimated partitions count and size in CQL form, + * as a direct CQL alternative to Thrift's describe_splits_ex(). + * + * Estimates (per primary range) are calculated and dumped into a system table (system.size_estimates) every 5 minutes. + * + * See CASSANDRA-7688. + */ +public class SizeEstimatesRecorder extends MigrationListener implements Runnable +{ + private static final Logger logger = LoggerFactory.getLogger(SizeEstimatesRecorder.class); + + public static final SizeEstimatesRecorder instance = new SizeEstimatesRecorder(); + + private SizeEstimatesRecorder() + { + MigrationManager.instance.register(this); + } + + public void run() + { + logger.debug("Recording size estimates"); + + // find primary token ranges for the local node. + Collection<Token> localTokens = StorageService.instance.getLocalTokens(); + Collection<Range<Token>> localRanges = StorageService.instance.getTokenMetadata().getPrimaryRangesFor(localTokens); + + for (Keyspace keyspace : Keyspace.nonSystem()) + for (ColumnFamilyStore table : keyspace.getColumnFamilyStores()) + recordSizeEstimates(table, localRanges); + } + + private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges) + { + // for each local primary range, estimate (crudely) mean partition size and partitions count. + Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size()); + for (Range<Token> range : localRanges) + { + // filter sstables that have partitions in this range. + List<SSTableReader> sstables = table.viewFilter(range.toRowBounds()).apply(table.getDataTracker().getView()); + SSTableReader.acquireReferences(sstables); + + long partitionsCount, meanPartitionSize; + try + { + // calculate the estimates. + partitionsCount = estimatePartitionsCount(sstables, range); + meanPartitionSize = estimateMeanPartitionSize(sstables); + } + finally + { + SSTableReader.releaseReferences(sstables); + } + + estimates.put(range, Pair.create(partitionsCount, meanPartitionSize)); + } + + // atomically update the estimates. + SystemKeyspace.updateSizeEstimates(table.metadata.ksName, table.metadata.cfName, estimates); + } + + private long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range) + { + long count = 0; + for (SSTableReader sstable : sstables) + count += sstable.estimatedKeysForRanges(Collections.singleton(range)); + return count; + } + + private long estimateMeanPartitionSize(Collection<SSTableReader> sstables) + { + long sum = 0, count = 0; + for (SSTableReader sstable : sstables) + { + long n = sstable.getEstimatedRowSize().count(); + sum += sstable.getEstimatedRowSize().mean() * n; + count += n; + } + return count > 0 ? sum / count : 0; + } + + @Override + public void onDropColumnFamily(String keyspace, String table) + { + SystemKeyspace.clearSizeEstimates(keyspace, table); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 2a3535a..7338b28 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -84,6 +84,7 @@ public class SystemKeyspace public static final String PAXOS_CF = "paxos"; public static final String SSTABLE_ACTIVITY_CF = "sstable_activity"; public static final String COMPACTION_HISTORY_CF = "compaction_history"; + public static final String SIZE_ESTIMATES_CF = "size_estimates"; private static final String LOCAL_KEY = "local"; @@ -941,4 +942,45 @@ public class SystemKeyspace String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?"; executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF), keyspace, table, generation); } + + /** + * Writes the current partition count and size estimates into SIZE_ESTIMATES_CF + */ + public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) + { + long timestamp = FBUtilities.timestampMicros(); + CFMetaData estimatesTable = CFMetaData.SizeEstimatesCf; + Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UTF8Type.instance.decompose(keyspace)); + + // delete all previous values with a single range tombstone. + mutation.deleteRange(SIZE_ESTIMATES_CF, + estimatesTable.comparator.make(table).start(), + estimatesTable.comparator.make(table).end(), + timestamp - 1); + + // add a CQL row for each primary token range. + ColumnFamily cells = mutation.addOrGet(estimatesTable); + for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet()) + { + Range<Token> range = entry.getKey(); + Pair<Long, Long> values = entry.getValue(); + Composite prefix = estimatesTable.comparator.make(table, range.left.toString(), range.right.toString()); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + adder.add("partitions_count", values.left) + .add("mean_partition_size", values.right); + } + + mutation.apply(); + } + + /** + * Clears size estimates for a table (on table drop) + */ + public static void clearSizeEstimates(String keyspace, String table) + { + String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", + Keyspace.SYSTEM_KS, + SIZE_ESTIMATES_CF); + executeInternal(cql, keyspace, table); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index c2b7b7d..1198b3e 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -45,10 +45,7 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.exceptions.ConfigurationException; @@ -330,7 +327,7 @@ public class CassandraDaemon } } }; - ScheduledExecutors.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS); + ScheduledExecutors.optionalTasks.schedule(runnable, 5, TimeUnit.MINUTES); SystemKeyspace.finishStartup(); @@ -368,7 +365,10 @@ public class CassandraDaemon if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) waitForGossipToSettle(); - // Thift + // schedule periodic dumps of table size estimates into SystemKeyspace.SIZE_ESTIMATES_CF + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, 5 * 60, TimeUnit.SECONDS); + + // Thrift InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); int rpcPort = DatabaseDescriptor.getRpcPort(); int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();