dc local read repair setting down to the DC-level patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-2506
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e6a4c01 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e6a4c01 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e6a4c01 Branch: refs/heads/cassandra-1.0 Commit: 4e6a4c016a9efe60c053318b4da81e6a90b9588b Parents: 12d26f8 Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Fri Feb 10 10:51:04 2012 -0800 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Fri Feb 10 10:51:04 2012 -0800 ---------------------------------------------------------------------- interface/cassandra.thrift | 1 + .../org/apache/cassandra/thrift/Cassandra.java | 6 +- .../org/apache/cassandra/thrift/CfDef.java | 96 ++++++++++++++- src/avro/internode.genavro | 1 + src/java/org/apache/cassandra/cli/CliClient.java | 11 ++ .../org/apache/cassandra/config/CFMetaData.java | 23 ++++ .../apache/cassandra/cql/AlterTableStatement.java | 1 + src/java/org/apache/cassandra/cql/CFPropDefs.java | 2 + .../cassandra/cql/CreateColumnFamilyStatement.java | 1 + src/java/org/apache/cassandra/cql3/CFPropDefs.java | 2 + .../cql3/statements/AlterTableStatement.java | 1 + .../statements/CreateColumnFamilyStatement.java | 1 + .../cassandra/service/DatacenterReadCallback.java | 4 - .../org/apache/cassandra/service/ReadCallback.java | 43 +++++-- .../org/apache/cassandra/cli/CliHelp.yaml | 14 ++ 15 files changed, 187 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index 5fd2af6..a2275b6 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -417,6 +417,7 @@ struct CfDef { 34: optional string caching="keys_only", 35: optional list<binary> column_aliases, 36: optional binary value_alias, + 37: optional double dclocal_read_repair_chance = 0.0, } /* describes a keyspace. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java index 58f5ac5..3cfe58c 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java @@ -8526,6 +8526,8 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -26538,8 +26540,6 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -34148,8 +34148,6 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java index 8593794..0fe3a7d 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java @@ -69,6 +69,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34); private static final org.apache.thrift.protocol.TField COLUMN_ALIASES_FIELD_DESC = new org.apache.thrift.protocol.TField("column_aliases", org.apache.thrift.protocol.TType.LIST, (short)35); private static final org.apache.thrift.protocol.TField VALUE_ALIAS_FIELD_DESC = new org.apache.thrift.protocol.TField("value_alias", org.apache.thrift.protocol.TType.STRING, (short)36); + private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37); public String keyspace; // required public String name; // required @@ -94,6 +95,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav public String caching; // required public List<ByteBuffer> column_aliases; // required public ByteBuffer value_alias; // required + public double dclocal_read_repair_chance; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -120,7 +122,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav BLOOM_FILTER_FP_CHANCE((short)33, "bloom_filter_fp_chance"), CACHING((short)34, "caching"), COLUMN_ALIASES((short)35, "column_aliases"), - VALUE_ALIAS((short)36, "value_alias"); + VALUE_ALIAS((short)36, "value_alias"), + DCLOCAL_READ_REPAIR_CHANCE((short)37, "dclocal_read_repair_chance"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -183,6 +186,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return COLUMN_ALIASES; case 36: // VALUE_ALIAS return VALUE_ALIAS; + case 37: // DCLOCAL_READ_REPAIR_CHANCE + return DCLOCAL_READ_REPAIR_CHANCE; default: return null; } @@ -231,7 +236,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5; private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 6; private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 7; - private BitSet __isset_bit_vector = new BitSet(8); + private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 8; + private BitSet __isset_bit_vector = new BitSet(9); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -290,6 +296,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)))); tmpMap.put(_Fields.VALUE_ALIAS, new org.apache.thrift.meta_data.FieldMetaData("value_alias", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))); + tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap); } @@ -303,6 +311,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav this.caching = "keys_only"; + this.dclocal_read_repair_chance = 0; + } public CfDef( @@ -412,6 +422,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav this.value_alias = org.apache.thrift.TBaseHelper.copyBinary(other.value_alias); ; } + this.dclocal_read_repair_chance = other.dclocal_read_repair_chance; } public CfDef deepCopy() { @@ -455,6 +466,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav this.column_aliases = null; this.value_alias = null; + this.dclocal_read_repair_chance = 0; + } public String getKeyspace() { @@ -1097,6 +1110,29 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav } } + public double getDclocal_read_repair_chance() { + return this.dclocal_read_repair_chance; + } + + public CfDef setDclocal_read_repair_chance(double dclocal_read_repair_chance) { + this.dclocal_read_repair_chance = dclocal_read_repair_chance; + setDclocal_read_repair_chanceIsSet(true); + return this; + } + + public void unsetDclocal_read_repair_chance() { + __isset_bit_vector.clear(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID); + } + + /** Returns true if field dclocal_read_repair_chance is set (has been assigned a value) and false otherwise */ + public boolean isSetDclocal_read_repair_chance() { + return __isset_bit_vector.get(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID); + } + + public void setDclocal_read_repair_chanceIsSet(boolean value) { + __isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case KEYSPACE: @@ -1291,6 +1327,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav } break; + case DCLOCAL_READ_REPAIR_CHANCE: + if (value == null) { + unsetDclocal_read_repair_chance(); + } else { + setDclocal_read_repair_chance((Double)value); + } + break; + } } @@ -1368,6 +1412,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav case VALUE_ALIAS: return getValue_alias(); + case DCLOCAL_READ_REPAIR_CHANCE: + return Double.valueOf(getDclocal_read_repair_chance()); + } throw new IllegalStateException(); } @@ -1427,6 +1474,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return isSetColumn_aliases(); case VALUE_ALIAS: return isSetValue_alias(); + case DCLOCAL_READ_REPAIR_CHANCE: + return isSetDclocal_read_repair_chance(); } throw new IllegalStateException(); } @@ -1660,6 +1709,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return false; } + boolean this_present_dclocal_read_repair_chance = true && this.isSetDclocal_read_repair_chance(); + boolean that_present_dclocal_read_repair_chance = true && that.isSetDclocal_read_repair_chance(); + if (this_present_dclocal_read_repair_chance || that_present_dclocal_read_repair_chance) { + if (!(this_present_dclocal_read_repair_chance && that_present_dclocal_read_repair_chance)) + return false; + if (this.dclocal_read_repair_chance != that.dclocal_read_repair_chance) + return false; + } + return true; } @@ -1787,6 +1845,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav if (present_value_alias) builder.append(value_alias); + boolean present_dclocal_read_repair_chance = true && (isSetDclocal_read_repair_chance()); + builder.append(present_dclocal_read_repair_chance); + if (present_dclocal_read_repair_chance) + builder.append(dclocal_read_repair_chance); + return builder.toHashCode(); } @@ -2038,6 +2101,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return lastComparison; } } + lastComparison = Boolean.valueOf(isSetDclocal_read_repair_chance()).compareTo(typedOther.isSetDclocal_read_repair_chance()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetDclocal_read_repair_chance()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.dclocal_read_repair_chance, typedOther.dclocal_read_repair_chance); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -2276,6 +2349,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 37: // DCLOCAL_READ_REPAIR_CHANCE + if (field.type == org.apache.thrift.protocol.TType.DOUBLE) { + this.dclocal_read_repair_chance = iprot.readDouble(); + setDclocal_read_repair_chanceIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -2469,6 +2550,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav oprot.writeFieldEnd(); } } + if (isSetDclocal_read_repair_chance()) { + oprot.writeFieldBegin(DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC); + oprot.writeDouble(this.dclocal_read_repair_chance); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -2681,6 +2767,12 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav } first = false; } + if (isSetDclocal_read_repair_chance()) { + if (!first) sb.append(", "); + sb.append("dclocal_read_repair_chance:"); + sb.append(this.dclocal_read_repair_chance); + first = false; + } sb.append(")"); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/avro/internode.genavro ---------------------------------------------------------------------- diff --git a/src/avro/internode.genavro b/src/avro/internode.genavro index 36c2cba..d060d6e 100644 --- a/src/avro/internode.genavro +++ b/src/avro/internode.genavro @@ -69,6 +69,7 @@ protocol InterNode { union { null, string } caching = null; union { null, array<bytes> } column_aliases = null; union { null, bytes } value_alias = null; + union { double, null } dclocal_read_repair_chance = 0.0; } @aliases(["org.apache.cassandra.config.avro.KsDef"]) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cli/CliClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java index c132a77..05409e5 100644 --- a/src/java/org/apache/cassandra/cli/CliClient.java +++ b/src/java/org/apache/cassandra/cli/CliClient.java @@ -124,6 +124,7 @@ public class CliClient KEYS_CACHED, KEY_CACHE_SAVE_PERIOD, READ_REPAIR_CHANCE, + DCLOCAL_READ_REPAIR_CHANCE, GC_GRACE, COLUMN_METADATA, MEMTABLE_OPERATIONS, @@ -1196,6 +1197,14 @@ public class CliClient cfDef.setRead_repair_chance(chance); break; + case DCLOCAL_READ_REPAIR_CHANCE: + double localChance = Double.parseDouble(mValue); + + if (localChance < 0 || localChance > 1) + throw new RuntimeException("Error: dclocal_read_repair_chance must be between 0 and 1."); + + cfDef.setDclocal_read_repair_chance(localChance); + break; case GC_GRACE: cfDef.setGc_grace_seconds(Integer.parseInt(mValue)); break; @@ -1622,6 +1631,7 @@ public class CliClient normaliseType(cfDef.key_validation_class, "org.apache.cassandra.db.marshal")); writeAttr(output, false, "read_repair_chance", cfDef.read_repair_chance); + writeAttr(output, false, "dclocal_read_repair_chance", cfDef.dclocal_read_repair_chance); writeAttr(output, false, "gc_grace", cfDef.gc_grace_seconds); writeAttr(output, false, "min_compaction_threshold", cfDef.min_compaction_threshold); writeAttr(output, false, "max_compaction_threshold", cfDef.max_compaction_threshold); @@ -1975,6 +1985,7 @@ public class CliClient sessionState.out.printf(" GC grace seconds: %s%n", cf_def.gc_grace_seconds); sessionState.out.printf(" Compaction min/max thresholds: %s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold); sessionState.out.printf(" Read repair chance: %s%n", cf_def.read_repair_chance); + sessionState.out.printf(" DC Local Read repair chance: %s%n", cf_def.dclocal_read_repair_chance); sessionState.out.printf(" Replicate on write: %s%n", cf_def.replicate_on_write); sessionState.out.printf(" Caching: %s%n", cf_def.caching); sessionState.out.printf(" Bloom Filter FP chance: %s%n", cf_def.isSetBloom_filter_fp_chance() ? cf_def.bloom_filter_fp_chance : "default"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index d879a2c..defa6cf 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -63,6 +63,7 @@ public final class CFMetaData private static Logger logger = LoggerFactory.getLogger(CFMetaData.class); public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1; + public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0; public final static boolean DEFAULT_REPLICATE_ON_WRITE = true; public final static int DEFAULT_GC_GRACE_SECONDS = 864000; public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4; @@ -148,6 +149,7 @@ public final class CFMetaData //OPTIONAL private String comment; // default none, for humans only private double readRepairChance; // default 1.0 (always), chance [0.0,1.0] of read repair + private double dcLocalReadRepairChance; // default 0.0 private boolean replicateOnWrite; // default false private int gcGraceSeconds; // default 864000 (ten days) private AbstractType<?> defaultValidator; // default BytesType (no-op), use comparator types @@ -176,6 +178,7 @@ public final class CFMetaData public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;} public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;} + public CFMetaData dclocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;} public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;} public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;} public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; updateCfDef(); return this;} @@ -229,6 +232,7 @@ public final class CFMetaData { // Set a bunch of defaults readRepairChance = DEFAULT_READ_REPAIR_CHANCE; + dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE; replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE; gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD; @@ -264,6 +268,7 @@ public final class CFMetaData return newCFMD.comment(comment) .readRepairChance(0) + .dclocalReadRepairChance(0) .gcGraceSeconds(0) .mergeShardsChance(0.0); } @@ -273,6 +278,7 @@ public final class CFMetaData return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, null) .keyValidator(info.getValidator()) .readRepairChance(0.0) + .dclocalReadRepairChance(0.0) .gcGraceSeconds(parent.gcGraceSeconds) .minCompactionThreshold(parent.minCompactionThreshold) .maxCompactionThreshold(parent.maxCompactionThreshold); @@ -294,6 +300,7 @@ public final class CFMetaData { return newCFMD.comment(oldCFMD.comment) .readRepairChance(oldCFMD.readRepairChance) + .dclocalReadRepairChance(oldCFMD.dcLocalReadRepairChance) .replicateOnWrite(oldCFMD.replicateOnWrite) .gcGraceSeconds(oldCFMD.gcGraceSeconds) .defaultValidator(oldCFMD.defaultValidator) @@ -407,6 +414,7 @@ public final class CFMetaData return newCFMD.comment(cf.comment.toString()) .readRepairChance(cf.read_repair_chance) + .dclocalReadRepairChance(cf.dclocal_read_repair_chance) .replicateOnWrite(cf.replicate_on_write) .gcGraceSeconds(cf.gc_grace_seconds) .defaultValidator(validator) @@ -439,6 +447,11 @@ public final class CFMetaData return readRepairChance; } + public double getDcLocalReadRepair() + { + return dcLocalReadRepairChance; + } + public double getMergeShardsChance() { return mergeShardsChance; @@ -539,6 +552,7 @@ public final class CFMetaData .append(subcolumnComparator, rhs.subcolumnComparator) .append(comment, rhs.comment) .append(readRepairChance, rhs.readRepairChance) + .append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance) .append(replicateOnWrite, rhs.replicateOnWrite) .append(gcGraceSeconds, rhs.gcGraceSeconds) .append(defaultValidator, rhs.defaultValidator) @@ -569,6 +583,7 @@ public final class CFMetaData .append(subcolumnComparator) .append(comment) .append(readRepairChance) + .append(dcLocalReadRepairChance) .append(replicateOnWrite) .append(gcGraceSeconds) .append(defaultValidator) @@ -625,6 +640,8 @@ public final class CFMetaData put(CompressionParameters.SSTABLE_COMPRESSION, SnappyCompressor.class.getCanonicalName()); }}); } + if (!cf_def.isSetDclocal_read_repair_chance()) + cf_def.setDclocal_read_repair_chance(CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE); } public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) throws InvalidRequestException, ConfigurationException @@ -660,6 +677,8 @@ public final class CFMetaData newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance); if (cf_def.isSetCaching()) newCFMD.caching(Caching.fromString(cf_def.caching)); + if (cf_def.isSetDclocal_read_repair_chance()) + newCFMD.dclocalReadRepairChance(cf_def.dclocal_read_repair_chance); CompressionParameters cp = CompressionParameters.create(cf_def.compression_options); @@ -740,6 +759,8 @@ public final class CFMetaData comment = enforceCommentNotNull(cf_def.comment); readRepairChance = cf_def.read_repair_chance; + if (cf_def.isSetDclocal_read_repair_chance()) + dcLocalReadRepairChance = cf_def.dclocal_read_repair_chance; replicateOnWrite = cf_def.replicate_on_write; gcGraceSeconds = cf_def.gc_grace_seconds; defaultValidator = TypeParser.parse(cf_def.default_validation_class); @@ -870,6 +891,7 @@ public final class CFMetaData } def.setComment(enforceCommentNotNull(comment)); def.setRead_repair_chance(readRepairChance); + def.setDclocal_read_repair_chance(dcLocalReadRepairChance); def.setReplicate_on_write(replicateOnWrite); def.setGc_grace_seconds(gcGraceSeconds); def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString()); @@ -1220,6 +1242,7 @@ public final class CFMetaData .append("subcolumncomparator", subcolumnComparator) .append("comment", comment) .append("readRepairChance", readRepairChance) + .append("dclocalReadRepairChance", dcLocalReadRepairChance) .append("replicateOnWrite", replicateOnWrite) .append("gcGraceSeconds", gcGraceSeconds) .append("defaultValidator", defaultValidator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java index 5ca92bb..0557397 100644 --- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java @@ -182,6 +182,7 @@ public class AlterTableStatement } cfDef.read_repair_chance = cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfDef.read_repair_chance); + cfDef.dclocal_read_repair_chance = cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfDef.dclocal_read_repair_chance); cfDef.gc_grace_seconds = cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfDef.gc_grace_seconds); cfDef.replicate_on_write = cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfDef.replicate_on_write); cfDef.min_compaction_threshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfDef.min_compaction_threshold); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java index 896915a..8bcbcf2 100644 --- a/src/java/org/apache/cassandra/cql/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java @@ -44,6 +44,7 @@ public class CFPropDefs { public static final String KW_COMPARATOR = "comparator"; public static final String KW_COMMENT = "comment"; public static final String KW_READREPAIRCHANCE = "read_repair_chance"; + public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance"; public static final String KW_GCGRACESECONDS = "gc_grace_seconds"; public static final String KW_DEFAULTVALIDATION = "default_validation"; public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold"; @@ -81,6 +82,7 @@ public class CFPropDefs { keywords.add(KW_COMPARATOR); keywords.add(KW_COMMENT); keywords.add(KW_READREPAIRCHANCE); + keywords.add(KW_DCLOCALREADREPAIRCHANCE); keywords.add(KW_GCGRACESECONDS); keywords.add(KW_DEFAULTVALIDATION); keywords.add(KW_MINCOMPACTIONTHRESHOLD); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java index 960cc9d..062cd90 100644 --- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java @@ -177,6 +177,7 @@ public class CreateColumnFamilyStatement newCFMD.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT)) .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE)) + .dclocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE)) .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE)) .gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) .defaultValidator(cfProps.getValidator()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql3/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java index 63f74b0..7ac2167 100644 --- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java @@ -40,6 +40,7 @@ public class CFPropDefs public static final String KW_COMMENT = "comment"; public static final String KW_READREPAIRCHANCE = "read_repair_chance"; + public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance"; public static final String KW_GCGRACESECONDS = "gc_grace_seconds"; public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold"; public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold"; @@ -75,6 +76,7 @@ public class CFPropDefs keywords.add(KW_COMMENT); keywords.add(KW_READREPAIRCHANCE); + keywords.add(KW_DCLOCALREADREPAIRCHANCE); keywords.add(KW_GCGRACESECONDS); keywords.add(KW_MINCOMPACTIONTHRESHOLD); keywords.add(KW_MAXCOMPACTIONTHRESHOLD); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java index 4114773..368eb6d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java @@ -154,6 +154,7 @@ public class AlterTableStatement extends SchemaAlteringStatement } cfDef.read_repair_chance = cfProps.getDouble(CFPropDefs.KW_READREPAIRCHANCE, cfDef.read_repair_chance); + cfDef.dclocal_read_repair_chance = cfProps.getDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfDef.dclocal_read_repair_chance); cfDef.gc_grace_seconds = cfProps.getInt(CFPropDefs.KW_GCGRACESECONDS, cfDef.gc_grace_seconds); cfDef.replicate_on_write = cfProps.getBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfDef.replicate_on_write); cfDef.min_compaction_threshold = cfProps.getInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfDef.min_compaction_threshold); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java index 44d187c..767437e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java @@ -107,6 +107,7 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement newCFMD.comment(properties.get(CFPropDefs.KW_COMMENT)) .readRepairChance(properties.getDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE)) + .dclocalReadRepairChance(properties.getDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE)) .replicateOnWrite(properties.getBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE)) .gcGraceSeconds(properties.getInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) .defaultValidator(defaultValidator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/service/DatacenterReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java index eaca5ef..00540ca 100644 --- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java +++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java @@ -22,7 +22,6 @@ package org.apache.cassandra.service; import java.net.InetAddress; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -35,15 +34,12 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.Message; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.UnavailableException; -import org.apache.cassandra.utils.FBUtilities; /** * Datacenter Quorum response handler blocks for a quorum of responses from the local DC */ public class DatacenterReadCallback<T> extends ReadCallback<T> { - private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - private static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>() { public int compare(InetAddress endpoint1, InetAddress endpoint2) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index b02c5cd..3f30a2d 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -37,6 +37,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Table; +import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -46,10 +47,15 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SimpleCondition; import org.apache.cassandra.utils.WrappedRunnable; +import com.google.common.collect.Lists; + public class ReadCallback<T> implements IAsyncCallback { protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); + protected static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + protected static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); + public final IResponseResolver<T> resolver; protected final SimpleCondition condition = new SimpleCondition(); private final long startTime; @@ -67,15 +73,10 @@ public class ReadCallback<T> implements IAsyncCallback this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace()); this.resolver = resolver; this.startTime = System.currentTimeMillis(); - boolean repair = randomlyReadRepair(); sortForConsistencyLevel(endpoints); - this.endpoints = repair || resolver instanceof RowRepairResolver - ? endpoints - : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); - + this.endpoints = resolver instanceof RowRepairResolver ? endpoints : filterEndpoints(endpoints); if (logger.isDebugEnabled()) - logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s", - blockfor, repair, StringUtils.join(this.endpoints, ","))); + logger.debug(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ","))); } /** @@ -89,7 +90,7 @@ public class ReadCallback<T> implements IAsyncCallback // no-op except in DRC } - private boolean randomlyReadRepair() + private List<InetAddress> filterEndpoints(List<InetAddress> ep) { if (resolver instanceof RowDigestResolver) { @@ -97,10 +98,32 @@ public class ReadCallback<T> implements IAsyncCallback String table = ((RowDigestResolver) resolver).table; String columnFamily = ((ReadCommand) command).getColumnFamilyName(); CFMetaData cfmd = Schema.instance.getTableMetaData(table).get(columnFamily); - return cfmd.getReadRepairChance() > FBUtilities.threadLocalRandom().nextDouble(); + double chance = FBUtilities.threadLocalRandom().nextDouble(); + + // if global repair then just return all the ep's + if (cfmd.getReadRepairChance() > chance) + return ep; + + // if local repair then just return localDC ep's + if (cfmd.getDcLocalReadRepair() > chance) + { + List<InetAddress> local = Lists.newArrayList(); + List<InetAddress> other = Lists.newArrayList(); + for (InetAddress add : ep) + { + if (snitch.getDatacenter(add).equals(localdc)) + local.add(add); + else + other.add(add); + } + // check if blockfor more than we have localep's + if (local.size() < blockfor) + local.addAll(other.subList(0, Math.min(blockfor - local.size(), other.size()))); + return local; + } } // we don't read repair on range scans - return false; + return ep.subList(0, Math.min(ep.size(), blockfor)); } public T get() throws TimeoutException, DigestMismatchException, IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/resources/org/apache/cassandra/cli/CliHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/cli/CliHelp.yaml b/src/resources/org/apache/cassandra/cli/CliHelp.yaml index 873cc41..25a9f59 100644 --- a/src/resources/org/apache/cassandra/cli/CliHelp.yaml +++ b/src/resources/org/apache/cassandra/cli/CliHelp.yaml @@ -665,6 +665,20 @@ commands: will not have any latency information from all the replicas to recognize when one is performing worse than usual. + - dclocal_read_repair_chance: Probability (0.0-1.0) with which to + perform read repairs against the node from the local data-center. If + this is lower than read_repair_chance, this will be ignored. + + Example: + update column family Standard2 + with read_repair_chance=0.1 + and dclocal_read_repair_chance=0.5; + + For 10 read queries, 1 will do read repair on all replicas (and + thus in particular on all replica of the local DC), 4 will only do + read repair on replica of the local DC and 5 will not do any read + repair. + - subcomparator: Validator to use to validate and compare sub column names in this column family. Only applied to Super column families. Default is BytesType, which is a straight forward lexical comparison of the bytes in