Author: jbellis Date: Tue Jul 26 18:42:37 2011 New Revision: 1151205 URL: http://svn.apache.org/viewvc?rev=1151205&view=rev Log: add row_cache_keys_to_save CF option patch by Chris Burroughs; reviewed by jbellis for CASSANDRA-1966
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/interface/cassandra.thrift cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java cassandra/trunk/src/avro/internode.genavro cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Tue Jul 26 18:42:37 2011 @@ -18,6 +18,7 @@ * store hints as serialized mutations instead of pointers to data rows * store hints in the coordinator node instead of in the closest replica (CASSANDRA-2914). + * add row_cache_keys_to_save CF option (CASSANDRA-1966) 0.8.2 Modified: cassandra/trunk/interface/cassandra.thrift URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/interface/cassandra.thrift (original) +++ cassandra/trunk/interface/cassandra.thrift Tue Jul 26 18:42:37 2011 @@ -46,7 +46,7 @@ namespace rb CassandraThrift # for every edit that doesn't result in a change to major/minor. # # See the Semantic Versioning Specification (SemVer) http://semver.org. -const string VERSION = "19.10.0" +const string VERSION = "19.11.0" # @@ -396,6 +396,7 @@ struct CfDef { 28: optional binary key_alias, 29: optional string compaction_strategy, 30: optional map<string,string> compaction_strategy_options, + 31: optional i32 row_cache_keys_to_save, } /* describes a keyspace. */ Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original) +++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Tue Jul 26 18:42:37 2011 @@ -9086,6 +9086,8 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -17041,8 +17043,6 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -25752,6 +25752,8 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original) +++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Jul 26 18:42:37 2011 @@ -71,6 +71,7 @@ public class CfDef implements org.apache private static final org.apache.thrift.protocol.TField KEY_ALIAS_FIELD_DESC = new org.apache.thrift.protocol.TField("key_alias", org.apache.thrift.protocol.TType.STRING, (short)28); private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy", org.apache.thrift.protocol.TType.STRING, (short)29); private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy_options", org.apache.thrift.protocol.TType.MAP, (short)30); + private static final org.apache.thrift.protocol.TField ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_keys_to_save", org.apache.thrift.protocol.TType.I32, (short)31); public String keyspace; public String name; @@ -98,6 +99,7 @@ public class CfDef implements org.apache public ByteBuffer key_alias; public String compaction_strategy; public Map<String,String> compaction_strategy_options; + public int row_cache_keys_to_save; /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -126,7 +128,8 @@ public class CfDef implements org.apache ROW_CACHE_PROVIDER((short)27, "row_cache_provider"), KEY_ALIAS((short)28, "key_alias"), COMPACTION_STRATEGY((short)29, "compaction_strategy"), - COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options"); + COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options"), + ROW_CACHE_KEYS_TO_SAVE((short)31, "row_cache_keys_to_save"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -193,6 +196,8 @@ public class CfDef implements org.apache return COMPACTION_STRATEGY; case 30: // COMPACTION_STRATEGY_OPTIONS return COMPACTION_STRATEGY_OPTIONS; + case 31: // ROW_CACHE_KEYS_TO_SAVE + return ROW_CACHE_KEYS_TO_SAVE; default: return null; } @@ -246,7 +251,8 @@ public class CfDef implements org.apache private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 10; private static final int __REPLICATE_ON_WRITE_ISSET_ID = 11; private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 12; - private BitSet __isset_bit_vector = new BitSet(13); + private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 13; + private BitSet __isset_bit_vector = new BitSet(14); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -306,6 +312,8 @@ public class CfDef implements org.apache new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + tmpMap.put(_Fields.ROW_CACHE_KEYS_TO_SAVE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_keys_to_save", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap); } @@ -409,6 +417,7 @@ public class CfDef implements org.apache } this.compaction_strategy_options = __this__compaction_strategy_options; } + this.row_cache_keys_to_save = other.row_cache_keys_to_save; } public CfDef deepCopy() { @@ -459,6 +468,8 @@ public class CfDef implements org.apache this.key_alias = null; this.compaction_strategy = null; this.compaction_strategy_options = null; + setRow_cache_keys_to_saveIsSet(false); + this.row_cache_keys_to_save = 0; } public String getKeyspace() { @@ -1108,6 +1119,29 @@ public class CfDef implements org.apache } } + public int getRow_cache_keys_to_save() { + return this.row_cache_keys_to_save; + } + + public CfDef setRow_cache_keys_to_save(int row_cache_keys_to_save) { + this.row_cache_keys_to_save = row_cache_keys_to_save; + setRow_cache_keys_to_saveIsSet(true); + return this; + } + + public void unsetRow_cache_keys_to_save() { + __isset_bit_vector.clear(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID); + } + + /** Returns true if field row_cache_keys_to_save is set (has been assigned a value) and false otherwise */ + public boolean isSetRow_cache_keys_to_save() { + return __isset_bit_vector.get(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID); + } + + public void setRow_cache_keys_to_saveIsSet(boolean value) { + __isset_bit_vector.set(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case KEYSPACE: @@ -1318,6 +1352,14 @@ public class CfDef implements org.apache } break; + case ROW_CACHE_KEYS_TO_SAVE: + if (value == null) { + unsetRow_cache_keys_to_save(); + } else { + setRow_cache_keys_to_save((Integer)value); + } + break; + } } @@ -1401,6 +1443,9 @@ public class CfDef implements org.apache case COMPACTION_STRATEGY_OPTIONS: return getCompaction_strategy_options(); + case ROW_CACHE_KEYS_TO_SAVE: + return new Integer(getRow_cache_keys_to_save()); + } throw new IllegalStateException(); } @@ -1464,6 +1509,8 @@ public class CfDef implements org.apache return isSetCompaction_strategy(); case COMPACTION_STRATEGY_OPTIONS: return isSetCompaction_strategy_options(); + case ROW_CACHE_KEYS_TO_SAVE: + return isSetRow_cache_keys_to_save(); } throw new IllegalStateException(); } @@ -1715,6 +1762,15 @@ public class CfDef implements org.apache return false; } + boolean this_present_row_cache_keys_to_save = true && this.isSetRow_cache_keys_to_save(); + boolean that_present_row_cache_keys_to_save = true && that.isSetRow_cache_keys_to_save(); + if (this_present_row_cache_keys_to_save || that_present_row_cache_keys_to_save) { + if (!(this_present_row_cache_keys_to_save && that_present_row_cache_keys_to_save)) + return false; + if (this.row_cache_keys_to_save != that.row_cache_keys_to_save) + return false; + } + return true; } @@ -1852,6 +1908,11 @@ public class CfDef implements org.apache if (present_compaction_strategy_options) builder.append(compaction_strategy_options); + boolean present_row_cache_keys_to_save = true && (isSetRow_cache_keys_to_save()); + builder.append(present_row_cache_keys_to_save); + if (present_row_cache_keys_to_save) + builder.append(row_cache_keys_to_save); + return builder.toHashCode(); } @@ -2123,6 +2184,16 @@ public class CfDef implements org.apache return lastComparison; } } + lastComparison = Boolean.valueOf(isSetRow_cache_keys_to_save()).compareTo(typedOther.isSetRow_cache_keys_to_save()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRow_cache_keys_to_save()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.row_cache_keys_to_save, typedOther.row_cache_keys_to_save); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -2358,6 +2429,14 @@ public class CfDef implements org.apache org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 31: // ROW_CACHE_KEYS_TO_SAVE + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.row_cache_keys_to_save = iprot.readI32(); + setRow_cache_keys_to_saveIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -2540,6 +2619,11 @@ public class CfDef implements org.apache oprot.writeFieldEnd(); } } + if (isSetRow_cache_keys_to_save()) { + oprot.writeFieldBegin(ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC); + oprot.writeI32(this.row_cache_keys_to_save); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -2752,6 +2836,12 @@ public class CfDef implements org.apache } first = false; } + if (isSetRow_cache_keys_to_save()) { + if (!first) sb.append(", "); + sb.append("row_cache_keys_to_save:"); + sb.append(this.row_cache_keys_to_save); + first = false; + } sb.append(")"); return sb.toString(); } Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java (original) +++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java Tue Jul 26 18:42:37 2011 @@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory; public class Constants { - public static final String VERSION = "19.10.0"; + public static final String VERSION = "19.11.0"; } Modified: cassandra/trunk/src/avro/internode.genavro URL: http://svn.apache.org/viewvc/cassandra/trunk/src/avro/internode.genavro?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/avro/internode.genavro (original) +++ cassandra/trunk/src/avro/internode.genavro Tue Jul 26 18:42:37 2011 @@ -57,6 +57,7 @@ protocol InterNode { union { null, int } max_compaction_threshold = null; union { int, null } row_cache_save_period_in_seconds = 0; union { int, null } key_cache_save_period_in_seconds = 3600; + union { int, null } row_cache_keys_to_save = null; union { null, int } memtable_throughput_in_mb = null; union { null, double} memtable_operations_in_millions = null; union { null, double} merge_shards_chance = null; Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Tue Jul 26 18:42:37 2011 @@ -77,12 +77,12 @@ public abstract class AutoSavingCache<K, return DatabaseDescriptor.getSerializedCachePath(tableName, cfName, cacheType); } - public Writer getWriter() + public Writer getWriter(int keysToSave) { - return new Writer(tableName, cfName); + return new Writer(tableName, cfName, keysToSave); } - public void scheduleSaving(int savePeriodInSeconds) + public void scheduleSaving(int savePeriodInSeconds, final int keysToSave) { if (saveTask != null) { @@ -95,7 +95,7 @@ public abstract class AutoSavingCache<K, { public void runMayThrow() { - submitWrite(); + submitWrite(keysToSave); } }; saveTask = StorageService.tasks.scheduleWithFixedDelay(runnable, @@ -105,9 +105,9 @@ public abstract class AutoSavingCache<K, } } - public Future<?> submitWrite() + public Future<?> submitWrite(int keysToSave) { - return CompactionManager.instance.submitCacheWrite(getWriter()); + return CompactionManager.instance.submitCacheWrite(getWriter(keysToSave)); } public Set<DecoratedKey> readSaved() @@ -195,9 +195,12 @@ public abstract class AutoSavingCache<K, private final long estimatedTotalBytes; private long bytesWritten; - private Writer(String ksname, String cfname) + private Writer(String ksname, String cfname, int keysToSave) { - keys = getKeySet(); + if (keysToSave >= getKeySet().size()) + keys = getKeySet(); + else + keys = hotKeySet(keysToSave); long bytes = 0; for (K key : keys) bytes += translateKey(key).remaining(); Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java Tue Jul 26 18:42:37 2011 @@ -96,6 +96,11 @@ public class ConcurrentLinkedHashCache<K return map.keySet(); } + public Set<K> hotKeySet(int n) + { + return map.descendingKeySetWithLimit(n); + } + public boolean isPutCopying() { return false; Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java Tue Jul 26 18:42:37 2011 @@ -46,6 +46,8 @@ public interface ICache<K, V> public Set<K> keySet(); + public Set<K> hotKeySet(int n); + /** * @return true if the cache implementation inherently copies the cached values; otherwise, * the caller should copy manually before caching shared values like Thrift ByteBuffers. Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java Tue Jul 26 18:42:37 2011 @@ -150,6 +150,11 @@ public class InstrumentingCache<K, V> im return map.keySet(); } + public Set<K> hotKeySet(int n) + { + return map.hotKeySet(n); + } + public boolean isPutCopying() { return map.isPutCopying(); Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java Tue Jul 26 18:42:37 2011 @@ -163,6 +163,11 @@ public class SerializingCache<K, V> impl return map.keySet(); } + public Set<K> hotKeySet(int n) + { + return map.descendingKeySetWithLimit(n); + } + public boolean isPutCopying() { return true; Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Jul 26 18:42:37 2011 @@ -119,6 +119,7 @@ public class CliClient COMMENT, ROWS_CACHED, ROW_CACHE_SAVE_PERIOD, + ROW_CACHE_KEYS_TO_SAVE, KEYS_CACHED, KEY_CACHE_SAVE_PERIOD, READ_REPAIR_CHANCE, @@ -1231,6 +1232,9 @@ public class CliClient case KEY_CACHE_SAVE_PERIOD: cfDef.setKey_cache_save_period_in_seconds(Integer.parseInt(mValue)); break; + case ROW_CACHE_KEYS_TO_SAVE: + cfDef.setRow_cache_keys_to_save(Integer.parseInt(mValue)); + break; case DEFAULT_VALIDATION_CLASS: cfDef.setDefault_validation_class(CliUtils.unescapeSQLString(mValue)); break; @@ -1716,7 +1720,9 @@ public class CliClient if (cf_def.default_validation_class != null) sessionState.out.printf(" Default column value validator: %s%n", cf_def.default_validation_class); sessionState.out.printf(" Columns sorted by: %s%s%n", cf_def.comparator_type, cf_def.column_type.equals("Super") ? "/" + cf_def.subcomparator_type : ""); - sessionState.out.printf(" Row cache size / save period in seconds: %s/%s%n", cf_def.row_cache_size, cf_def.row_cache_save_period_in_seconds); + sessionState.out.printf(" Row cache size / save period in seconds / keys to save : %s/%s/%s%n", + cf_def.row_cache_size, cf_def.row_cache_save_period_in_seconds, + cf_def.row_cache_keys_to_save == Integer.MAX_VALUE ? "all" : cf_def.row_cache_keys_to_save); sessionState.out.printf(" Key cache size / save period in seconds: %s/%s%n", cf_def.key_cache_size, cf_def.key_cache_save_period_in_seconds); sessionState.out.printf(" Memtable thresholds: %s/%s (millions of ops/MB)%n", cf_def.memtable_operations_in_millions, cf_def.memtable_throughput_in_mb); Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jul 26 18:42:37 2011 @@ -70,6 +70,7 @@ public final class CFMetaData public final static int DEFAULT_SYSTEM_MEMTABLE_THROUGHPUT_IN_MB = 8; public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0; public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 4 * 3600; + public final static int DEFAULT_ROW_CACHE_KEYS_TO_SAVE = Integer.MAX_VALUE; public final static int DEFAULT_GC_GRACE_SECONDS = 864000; public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4; public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32; @@ -164,6 +165,7 @@ public final class CFMetaData private int maxCompactionThreshold; // default 32 private int rowCacheSavePeriodInSeconds; // default 0 (off) private int keyCacheSavePeriodInSeconds; // default 3600 (1 hour) + private int rowCacheKeysToSave; // default max int (aka feature is off) private int memtableThroughputInMb; // default based on heap size private double memtableOperationsInMillions; // default based on throughput private double mergeShardsChance; // default 0.1, chance [0.0, 1.0] of merging old shards during replication @@ -186,6 +188,7 @@ public final class CFMetaData public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;} public CFMetaData rowCacheSavePeriod(int prop) {rowCacheSavePeriodInSeconds = prop; return this;} public CFMetaData keyCacheSavePeriod(int prop) {keyCacheSavePeriodInSeconds = prop; return this;} + public CFMetaData rowCacheKeysToSave(int prop) {rowCacheKeysToSave = prop; return this;} public CFMetaData memSize(int prop) {memtableThroughputInMb = prop; return this;} public CFMetaData memOps(double prop) {memtableOperationsInMillions = prop; return this;} public CFMetaData mergeShardsChance(double prop) {mergeShardsChance = prop; return this;} @@ -231,6 +234,7 @@ public final class CFMetaData // Set a bunch of defaults rowCacheSize = DEFAULT_ROW_CACHE_SIZE; keyCacheSize = DEFAULT_KEY_CACHE_SIZE; + rowCacheKeysToSave = DEFAULT_ROW_CACHE_KEYS_TO_SAVE; readRepairChance = DEFAULT_READ_REPAIR_CHANCE; replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE; gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; @@ -319,6 +323,7 @@ public final class CFMetaData .maxCompactionThreshold(oldCFMD.maxCompactionThreshold) .rowCacheSavePeriod(oldCFMD.rowCacheSavePeriodInSeconds) .keyCacheSavePeriod(oldCFMD.keyCacheSavePeriodInSeconds) + .rowCacheKeysToSave(oldCFMD.rowCacheKeysToSave) .memSize(oldCFMD.memtableThroughputInMb) .memOps(oldCFMD.memtableOperationsInMillions) .columnMetadata(oldCFMD.column_metadata) @@ -368,6 +373,7 @@ public final class CFMetaData cf.max_compaction_threshold = maxCompactionThreshold; cf.row_cache_save_period_in_seconds = rowCacheSavePeriodInSeconds; cf.key_cache_save_period_in_seconds = keyCacheSavePeriodInSeconds; + cf.row_cache_keys_to_save = rowCacheKeysToSave; cf.memtable_throughput_in_mb = memtableThroughputInMb; cf.memtable_operations_in_millions = memtableOperationsInMillions; cf.merge_shards_chance = mergeShardsChance; @@ -430,6 +436,7 @@ public final class CFMetaData if (cf.max_compaction_threshold != null) { newCFMD.maxCompactionThreshold(cf.max_compaction_threshold); } if (cf.row_cache_save_period_in_seconds != null) { newCFMD.rowCacheSavePeriod(cf.row_cache_save_period_in_seconds); } if (cf.key_cache_save_period_in_seconds != null) { newCFMD.keyCacheSavePeriod(cf.key_cache_save_period_in_seconds); } + if (cf.row_cache_keys_to_save != null) { newCFMD.rowCacheKeysToSave(cf.row_cache_keys_to_save); } if (cf.memtable_throughput_in_mb != null) { newCFMD.memSize(cf.memtable_throughput_in_mb); } if (cf.memtable_operations_in_millions != null) { newCFMD.memOps(cf.memtable_operations_in_millions); } if (cf.merge_shards_chance != null) { newCFMD.mergeShardsChance(cf.merge_shards_chance); } @@ -538,6 +545,11 @@ public final class CFMetaData return keyCacheSavePeriodInSeconds; } + public int getRowCacheKeysToSave() + { + return rowCacheKeysToSave; + } + public int getMemtableThroughputInMb() { return memtableThroughputInMb; @@ -600,6 +612,7 @@ public final class CFMetaData .append(column_metadata, rhs.column_metadata) .append(rowCacheSavePeriodInSeconds, rhs.rowCacheSavePeriodInSeconds) .append(keyCacheSavePeriodInSeconds, rhs.keyCacheSavePeriodInSeconds) + .append(rowCacheKeysToSave, rhs.rowCacheKeysToSave) .append(memtableThroughputInMb, rhs.memtableThroughputInMb) .append(memtableOperationsInMillions, rhs.memtableOperationsInMillions) .append(mergeShardsChance, rhs.mergeShardsChance) @@ -631,6 +644,7 @@ public final class CFMetaData .append(column_metadata) .append(rowCacheSavePeriodInSeconds) .append(keyCacheSavePeriodInSeconds) + .append(rowCacheKeysToSave) .append(memtableThroughputInMb) .append(memtableOperationsInMillions) .append(mergeShardsChance) @@ -669,6 +683,8 @@ public final class CFMetaData cf_def.setRow_cache_save_period_in_seconds(CFMetaData.DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS); if (!cf_def.isSetKey_cache_save_period_in_seconds()) cf_def.setKey_cache_save_period_in_seconds(CFMetaData.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS); + if (!cf_def.isSetRow_cache_keys_to_save()) + cf_def.setRow_cache_keys_to_save(CFMetaData.DEFAULT_ROW_CACHE_KEYS_TO_SAVE); if (!cf_def.isSetMemtable_throughput_in_mb()) cf_def.setMemtable_throughput_in_mb(CFMetaData.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB); if (!cf_def.isSetMemtable_operations_in_millions()) @@ -704,6 +720,7 @@ public final class CFMetaData if (cf_def.isSetMax_compaction_threshold()) { newCFMD.maxCompactionThreshold(cf_def.max_compaction_threshold); } if (cf_def.isSetRow_cache_save_period_in_seconds()) { newCFMD.rowCacheSavePeriod(cf_def.row_cache_save_period_in_seconds); } if (cf_def.isSetKey_cache_save_period_in_seconds()) { newCFMD.keyCacheSavePeriod(cf_def.key_cache_save_period_in_seconds); } + if (cf_def.isSetRow_cache_keys_to_save()) { newCFMD.rowCacheKeysToSave(cf_def.row_cache_keys_to_save); } if (cf_def.isSetMemtable_throughput_in_mb()) { newCFMD.memSize(cf_def.memtable_throughput_in_mb); } if (cf_def.isSetMemtable_operations_in_millions()) { newCFMD.memOps(cf_def.memtable_operations_in_millions); } if (cf_def.isSetMerge_shards_chance()) { newCFMD.mergeShardsChance(cf_def.merge_shards_chance); } @@ -776,6 +793,7 @@ public final class CFMetaData maxCompactionThreshold = cf_def.max_compaction_threshold; rowCacheSavePeriodInSeconds = cf_def.row_cache_save_period_in_seconds; keyCacheSavePeriodInSeconds = cf_def.key_cache_save_period_in_seconds; + rowCacheKeysToSave = cf_def.row_cache_keys_to_save; memtableThroughputInMb = cf_def.memtable_throughput_in_mb; memtableOperationsInMillions = cf_def.memtable_operations_in_millions; mergeShardsChance = cf_def.merge_shards_chance; @@ -895,6 +913,7 @@ public final class CFMetaData def.setMax_compaction_threshold(cfm.maxCompactionThreshold); def.setRow_cache_save_period_in_seconds(cfm.rowCacheSavePeriodInSeconds); def.setKey_cache_save_period_in_seconds(cfm.keyCacheSavePeriodInSeconds); + def.setRow_cache_keys_to_save(cfm.rowCacheKeysToSave); def.setMemtable_throughput_in_mb(cfm.memtableThroughputInMb); def.setMemtable_operations_in_millions(cfm.memtableOperationsInMillions); def.setMerge_shards_chance(cfm.mergeShardsChance); @@ -941,6 +960,7 @@ public final class CFMetaData def.max_compaction_threshold = cfm.maxCompactionThreshold; def.row_cache_save_period_in_seconds = cfm.rowCacheSavePeriodInSeconds; def.key_cache_save_period_in_seconds = cfm.keyCacheSavePeriodInSeconds; + def.row_cache_keys_to_save = cfm.rowCacheKeysToSave; def.memtable_throughput_in_mb = cfm.memtableThroughputInMb; def.memtable_operations_in_millions = cfm.memtableOperationsInMillions; def.merge_shards_chance = cfm.mergeShardsChance; @@ -986,6 +1006,7 @@ public final class CFMetaData newDef.read_repair_chance = def.getRead_repair_chance(); newDef.replicate_on_write = def.isReplicate_on_write(); newDef.row_cache_save_period_in_seconds = def.getRow_cache_save_period_in_seconds(); + newDef.row_cache_keys_to_save = def.getRow_cache_keys_to_save(); newDef.row_cache_size = def.getRow_cache_size(); newDef.subcomparator_type = def.getSubcomparator_type(); newDef.merge_shards_chance = def.getMerge_shards_chance(); @@ -1120,6 +1141,7 @@ public final class CFMetaData .append("maxCompactionThreshold", maxCompactionThreshold) .append("rowCacheSavePeriodInSeconds", rowCacheSavePeriodInSeconds) .append("keyCacheSavePeriodInSeconds", keyCacheSavePeriodInSeconds) + .append("rowCacheKeysToSave", rowCacheKeysToSave) .append("memtableThroughputInMb", memtableThroughputInMb) .append("memtableOperationsInMillions", memtableOperationsInMillions) .append("mergeShardsChance", mergeShardsChance) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Jul 26 18:42:37 2011 @@ -142,6 +142,7 @@ public class ColumnFamilyStore implement private volatile DefaultDouble memops; private volatile DefaultInteger rowCacheSaveInSeconds; private volatile DefaultInteger keyCacheSaveInSeconds; + private volatile DefaultInteger rowCacheKeysToSave; /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */ public final Lock flushLock = new ReentrantLock(); @@ -195,11 +196,13 @@ public class ColumnFamilyStore implement rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds()); if (!keyCacheSaveInSeconds.isModified()) keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds()); + if (!rowCacheKeysToSave.isModified()) + rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave()); compactionStrategy = metadata.createCompactionStrategyInstance(this); updateCacheSizes(); - scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value()); + scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value()); // figure out what needs to be added and dropped. // future: if/when we have modifiable settings for secondary indexes, they'll need to be handled here. @@ -241,6 +244,7 @@ public class ColumnFamilyStore implement this.memops = new DefaultDouble(metadata.getMemtableOperationsInMillions()); this.rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds()); this.keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds()); + this.rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave()); this.partitioner = partitioner; fileIndexGenerator.set(generation); @@ -542,13 +546,13 @@ public class ColumnFamilyStore implement table.name, columnFamily)); - scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds()); + scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds(), metadata.getRowCacheKeysToSave()); } - public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds) + public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds, int rowCacheKeysToSave) { - keyCache.scheduleSaving(keyCacheSavePeriodInSeconds); - rowCache.scheduleSaving(rowCacheSavePeriodInSeconds); + keyCache.scheduleSaving(keyCacheSavePeriodInSeconds, Integer.MAX_VALUE); + rowCache.scheduleSaving(rowCacheSavePeriodInSeconds, rowCacheKeysToSave); } public AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache() @@ -1985,6 +1989,7 @@ public class ColumnFamilyStore implement - get/set memtime - get/set rowCacheSavePeriodInSeconds - get/set keyCacheSavePeriodInSeconds + - get/set rowCacheKeysToSave */ public AbstractCompactionStrategy getCompactionStrategy() @@ -2056,7 +2061,7 @@ public class ColumnFamilyStore implement throw new RuntimeException("RowCacheSavePeriodInSeconds must be non-negative."); } this.rowCacheSaveInSeconds.set(rcspis); - scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value()); + scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value()); } public int getKeyCacheSavePeriodInSeconds() @@ -2070,7 +2075,17 @@ public class ColumnFamilyStore implement throw new RuntimeException("KeyCacheSavePeriodInSeconds must be non-negative."); } this.keyCacheSaveInSeconds.set(kcspis); - scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value()); + scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value()); + } + + public int getRowCacheKeysToSave() + { + return rowCacheKeysToSave.value(); + } + + public void setRowCacheKeysToSave(int keysToSave) + { + this.rowCacheKeysToSave.set(keysToSave); } // End JMX get/set. Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Tue Jul 26 18:42:37 2011 @@ -231,4 +231,7 @@ public interface ColumnFamilyStoreMBean public int getKeyCacheSavePeriodInSeconds(); public void setKeyCacheSavePeriodInSeconds(int kcspis); + + public int getRowCacheKeysToSave(); + public void setRowCacheKeysToSave(int keysToSave); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Tue Jul 26 18:42:37 2011 @@ -159,6 +159,7 @@ public abstract class Migration assert !StorageService.instance.isClientMode(); assert column != null; MigrationManager.announce(column); + passiveAnnounce(); // keeps gossip in sync w/ what we just told everyone } public final void passiveAnnounce() Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 26 18:42:37 2011 @@ -2216,8 +2216,8 @@ public class StorageService implements I logger_.debug("submitting cache saves"); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - futures.add(cfs.keyCache.submitWrite()); - futures.add(cfs.rowCache.submitWrite()); + futures.add(cfs.keyCache.submitWrite(-1)); + futures.add(cfs.rowCache.submitWrite(cfs.getRowCacheKeysToSave())); } FBUtilities.waitOnFutures(futures); logger_.debug("cache saves completed"); Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Tue Jul 26 18:42:37 2011 @@ -84,7 +84,7 @@ public class KeyCacheTest extends Cleanu } // force the cache to disk - store.keyCache.submitWrite().get(); + store.keyCache.submitWrite(Integer.MAX_VALUE).get(); // empty the cache again to make sure values came from disk store.invalidateKeyCache(); Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java?rev=1151205&r1=1151204&r2=1151205&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java Tue Jul 26 18:42:37 2011 @@ -114,6 +114,19 @@ public class RowCacheTest extends Cleanu @Test public void testRowCacheLoad() throws Exception { + rowCacheLoad(100, 100, Integer.MAX_VALUE); + } + + + @Test + public void testRowCachePartialLoad() throws Exception + { + rowCacheLoad(100, 50, 50); + } + + + public void rowCacheLoad(int totalKeys, int expectedKeys, int keysToSave) throws Exception + { CompactionManager.instance.disableAutoCompaction(); ColumnFamilyStore store = Table.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE); @@ -123,12 +136,12 @@ public class RowCacheTest extends Cleanu assert store.getRowCacheSize() == 0; // insert data and fill the cache - insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100); - readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100); - assert store.getRowCacheSize() == 100; + insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, totalKeys); + readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, totalKeys); + assert store.getRowCacheSize() == totalKeys; // force the cache to disk - store.rowCache.submitWrite().get(); + store.rowCache.submitWrite(keysToSave).get(); // empty the cache again to make sure values came from disk store.invalidateRowCache(); @@ -136,12 +149,28 @@ public class RowCacheTest extends Cleanu // load the cache from disk store.initCaches(); - assert store.getRowCacheSize() == 100; + assert store.getRowCacheSize() == expectedKeys; - for (int i = 0; i < 100; i++) + // If we are loading less than the entire cache back, we can't + // be sure which rows we will get if all rows are equally hot. + int nulls = 0; + int nonNull = 0; + for (int i = 0; i < expectedKeys; i++) { - // verify the correct data was found - assert store.getRawCachedRow(Util.dk("key" + i)).getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i)); + // verify the correct data was found when we expect to get + // back the entire cache. Otherwise only make assertions + // about how many items are read back. + ColumnFamily row = store.getRawCachedRow(Util.dk("key" + i)); + if (expectedKeys == totalKeys) + { + assert row != null; + assert row.getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i)); + } + if (row == null) + nulls++; + else + nonNull++; } + assert nulls + nonNull == expectedKeys; } }