Repository: cassandra
Updated Branches:
  refs/heads/trunk 0fa19b7ce -> e70959dea


Write partition size estimates into a system table

patch by Aleksey Yeschenko; reviewed by Piotr Kołaczkowski for
CASSANDRA-7688


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e60089db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e60089db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e60089db

Branch: refs/heads/trunk
Commit: e60089db08c7675dd507aa668bff862d437382d0
Parents: 9014498
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Thu Feb 5 16:06:56 2015 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Thu Feb 5 16:06:56 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |  10 ++
 .../org/apache/cassandra/config/KSMetaData.java |   3 +-
 .../cassandra/db/SizeEstimatesRecorder.java     | 121 +++++++++++++++++++
 .../org/apache/cassandra/db/SystemKeyspace.java |  42 +++++++
 .../cassandra/service/CassandraDaemon.java      |  12 +-
 6 files changed, 182 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 192939b..959a2de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Write partition size estimates into a system table (CASSANDRA-7688)
  * Upgrade libthrift to 0.9.2 (CASSANDRA-8685)
  * Don't use the shared ref in sstableloader (CASSANDRA-8704)
  * Purge internal prepared statements if related tables or

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index e75abb7..d55d1c0 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -290,6 +290,16 @@ public final class CFMetaData
                                                                  + "PRIMARY 
KEY (id)"
                                                                  + ") WITH 
COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800");
 
+    public static final CFMetaData SizeEstimatesCf = compile("CREATE TABLE " + 
SystemKeyspace.SIZE_ESTIMATES_CF + " ("
+                                                             + "keyspace_name 
text,"
+                                                             + "table_name 
text,"
+                                                             + "range_start 
text,"
+                                                             + "range_end 
text,"
+                                                             + 
"mean_partition_size bigint,"
+                                                             + 
"partitions_count bigint,"
+                                                             + "PRIMARY KEY 
((keyspace_name), table_name, range_start, range_end)"
+                                                             + ") WITH 
COMMENT='per-table primary range size estimates'");
+
 
     public static class SpeculativeRetry
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java 
b/src/java/org/apache/cassandra/config/KSMetaData.java
index 8c99191..22c59ca 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -104,7 +104,8 @@ public final class KSMetaData
                                                 CFMetaData.CompactionLogCf,
                                                 CFMetaData.CompactionHistoryCf,
                                                 CFMetaData.PaxosCf,
-                                                CFMetaData.SSTableActivityCF);
+                                                CFMetaData.SSTableActivityCF,
+                                                CFMetaData.SizeEstimatesCf);
         return new KSMetaData(Keyspace.SYSTEM_KS, LocalStrategy.class, 
Collections.<String, String>emptyMap(), true, cfDefs);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java 
b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
new file mode 100644
index 0000000..b739ba5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.service.MigrationListener;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * A very simplistic/crude partition count/size estimator.
+ *
+ * Exposing per-primary-range estimated partitions count and size in CQL form,
+ * as a direct CQL alternative to Thrift's describe_splits_ex().
+ *
+ * Estimates (per primary range) are calculated and dumped into a system table 
(system.size_estimates) every 5 minutes.
+ *
+ * See CASSANDRA-7688.
+ */
+public class SizeEstimatesRecorder extends MigrationListener implements 
Runnable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SizeEstimatesRecorder.class);
+
+    public static final SizeEstimatesRecorder instance = new 
SizeEstimatesRecorder();
+
+    private SizeEstimatesRecorder()
+    {
+        MigrationManager.instance.register(this);
+    }
+
+    public void run()
+    {
+        logger.debug("Recording size estimates");
+
+        // find primary token ranges for the local node.
+        Collection<Token> localTokens = 
StorageService.instance.getLocalTokens();
+        Collection<Range<Token>> localRanges = 
StorageService.instance.getTokenMetadata().getPrimaryRangesFor(localTokens);
+
+        for (Keyspace keyspace : Keyspace.nonSystem())
+            for (ColumnFamilyStore table : keyspace.getColumnFamilyStores())
+                recordSizeEstimates(table, localRanges);
+    }
+
+    private void recordSizeEstimates(ColumnFamilyStore table, 
Collection<Range<Token>> localRanges)
+    {
+        // for each local primary range, estimate (crudely) mean partition 
size and partitions count.
+        Map<Range<Token>, Pair<Long, Long>> estimates = new 
HashMap<>(localRanges.size());
+        for (Range<Token> range : localRanges)
+        {
+            // filter sstables that have partitions in this range.
+            List<SSTableReader> sstables = 
table.viewFilter(range.toRowBounds()).apply(table.getDataTracker().getView());
+            SSTableReader.acquireReferences(sstables);
+
+            long partitionsCount, meanPartitionSize;
+            try
+            {
+                // calculate the estimates.
+                partitionsCount = estimatePartitionsCount(sstables, range);
+                meanPartitionSize = estimateMeanPartitionSize(sstables);
+            }
+            finally
+            {
+                SSTableReader.releaseReferences(sstables);
+            }
+
+            estimates.put(range, Pair.create(partitionsCount, 
meanPartitionSize));
+        }
+
+        // atomically update the estimates.
+        SystemKeyspace.updateSizeEstimates(table.metadata.ksName, 
table.metadata.cfName, estimates);
+    }
+
+    private long estimatePartitionsCount(Collection<SSTableReader> sstables, 
Range<Token> range)
+    {
+        long count = 0;
+        for (SSTableReader sstable : sstables)
+            count += 
sstable.estimatedKeysForRanges(Collections.singleton(range));
+        return count;
+    }
+
+    private long estimateMeanPartitionSize(Collection<SSTableReader> sstables)
+    {
+        long sum = 0, count = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            long n = sstable.getEstimatedRowSize().count();
+            sum += sstable.getEstimatedRowSize().mean() * n;
+            count += n;
+        }
+        return count > 0 ? sum / count : 0;
+    }
+
+    @Override
+    public void onDropColumnFamily(String keyspace, String table)
+    {
+        SystemKeyspace.clearSizeEstimates(keyspace, table);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2a3535a..7338b28 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -84,6 +84,7 @@ public class SystemKeyspace
     public static final String PAXOS_CF = "paxos";
     public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
     public static final String COMPACTION_HISTORY_CF = "compaction_history";
+    public static final String SIZE_ESTIMATES_CF = "size_estimates";
 
     private static final String LOCAL_KEY = "local";
 
@@ -941,4 +942,45 @@ public class SystemKeyspace
         String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND 
columnfamily_name=? and generation=?";
         executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF), keyspace, 
table, generation);
     }
+
+    /**
+     * Writes the current partition count and size estimates into 
SIZE_ESTIMATES_CF
+     */
+    public static void updateSizeEstimates(String keyspace, String table, 
Map<Range<Token>, Pair<Long, Long>> estimates)
+    {
+        long timestamp = FBUtilities.timestampMicros();
+        CFMetaData estimatesTable = CFMetaData.SizeEstimatesCf;
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, 
UTF8Type.instance.decompose(keyspace));
+
+        // delete all previous values with a single range tombstone.
+        mutation.deleteRange(SIZE_ESTIMATES_CF,
+                             estimatesTable.comparator.make(table).start(),
+                             estimatesTable.comparator.make(table).end(),
+                             timestamp - 1);
+
+        // add a CQL row for each primary token range.
+        ColumnFamily cells = mutation.addOrGet(estimatesTable);
+        for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : 
estimates.entrySet())
+        {
+            Range<Token> range = entry.getKey();
+            Pair<Long, Long> values = entry.getValue();
+            Composite prefix = estimatesTable.comparator.make(table, 
range.left.toString(), range.right.toString());
+            CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+            adder.add("partitions_count", values.left)
+                 .add("mean_partition_size", values.right);
+        }
+
+        mutation.apply();
+    }
+
+    /**
+     * Clears size estimates for a table (on table drop)
+     */
+    public static void clearSizeEstimates(String keyspace, String table)
+    {
+        String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ? 
AND table_name = ?",
+                                   Keyspace.SYSTEM_KS,
+                                   SIZE_ESTIMATES_CF);
+        executeInternal(cql, keyspace, table);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60089db/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index c2b7b7d..1198b3e 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -45,10 +45,7 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -330,7 +327,7 @@ public class CassandraDaemon
                 }
             }
         };
-        ScheduledExecutors.optionalTasks.schedule(runnable, 5 * 60, 
TimeUnit.SECONDS);
+        ScheduledExecutors.optionalTasks.schedule(runnable, 5, 
TimeUnit.MINUTES);
 
         SystemKeyspace.finishStartup();
 
@@ -368,7 +365,10 @@ public class CassandraDaemon
         if 
(!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
             waitForGossipToSettle();
 
-        // Thift
+        // schedule periodic dumps of table size estimates into 
SystemKeyspace.SIZE_ESTIMATES_CF
+        
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
 30, 5 * 60, TimeUnit.SECONDS);
+
+        // Thrift
         InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
         int rpcPort = DatabaseDescriptor.getRpcPort();
         int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();

Reply via email to