http://git-wip-us.apache.org/repos/asf/cassandra/blob/86e949f3/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java index 60aa07f..059d630 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java @@ -32,10 +32,15 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; import java.util.EnumSet; import java.util.Collections; import java.util.BitSet; import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CfDef"); @@ -62,6 +67,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final org.apache.thrift.protocol.TField BLOOM_FILTER_FP_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("bloom_filter_fp_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)33); private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34); private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37); + private static final org.apache.thrift.protocol.TField POPULATE_IO_CACHE_ON_FLUSH_FIELD_DESC = new org.apache.thrift.protocol.TField("populate_io_cache_on_flush", org.apache.thrift.protocol.TType.BOOL, (short)38); private static final org.apache.thrift.protocol.TField ROW_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)9); private static final org.apache.thrift.protocol.TField KEY_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("key_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)11); private static final org.apache.thrift.protocol.TField ROW_CACHE_SAVE_PERIOD_IN_SECONDS_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_save_period_in_seconds", org.apache.thrift.protocol.TType.I32, (short)19); @@ -72,7 +78,6 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final org.apache.thrift.protocol.TField MERGE_SHARDS_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("merge_shards_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)25); private static final org.apache.thrift.protocol.TField ROW_CACHE_PROVIDER_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_provider", org.apache.thrift.protocol.TType.STRING, (short)27); private static final org.apache.thrift.protocol.TField ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_keys_to_save", org.apache.thrift.protocol.TType.I32, (short)31); - private static final org.apache.thrift.protocol.TField POPULATE_IO_CACHE_ON_FLUSH_FIELD_DESC = new org.apache.thrift.protocol.TField("populate_io_cache_on_flush", org.apache.thrift.protocol.TType.BOOL, (short)38); public String keyspace; // required public String name; // required @@ -331,16 +336,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5; private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 6; private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 7; - private static final int __ROW_CACHE_SIZE_ISSET_ID = 8; - private static final int __KEY_CACHE_SIZE_ISSET_ID = 9; - private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 10; - private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11; - private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 12; - private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 13; - private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 14; - private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 15; - private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 16; - private static final int __POPULATE_IO_CACHE_ISSET_ID = 17; + private static final int __POPULATE_IO_CACHE_ON_FLUSH_ISSET_ID = 8; + private static final int __ROW_CACHE_SIZE_ISSET_ID = 9; + private static final int __KEY_CACHE_SIZE_ISSET_ID = 10; + private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11; + private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 12; + private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 13; + private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 14; + private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 15; + private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 16; + private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 17; private BitSet __isset_bit_vector = new BitSet(18); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; @@ -375,7 +380,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.REPLICATE_ON_WRITE, new org.apache.thrift.meta_data.FieldMetaData("replicate_on_write", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); - tmpMap.put(_Fields.KEY_VALIDATION_CLASS, new org.apache.thrift.meta_data.FieldMetaData("key_validation_class", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.KEY_VALIDATION_CLASS, new org.apache.thrift.meta_data.FieldMetaData("key_validation_class", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.KEY_ALIAS, new org.apache.thrift.meta_data.FieldMetaData("key_alias", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))); @@ -395,9 +400,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); - tmpMap.put(_Fields.POPULATE_IO_CACHE_ON_FLUSH, new org.apache.thrift.meta_data.FieldMetaData("populate_io_cache_on_flush", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.POPULATE_IO_CACHE_ON_FLUSH, new org.apache.thrift.meta_data.FieldMetaData("populate_io_cache_on_flush", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); - tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); tmpMap.put(_Fields.KEY_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("key_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); @@ -429,6 +434,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav this.caching = "keys_only"; this.dclocal_read_repair_chance = 0; + } public CfDef( @@ -485,6 +491,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav } if (other.isSetKey_alias()) { this.key_alias = org.apache.thrift.TBaseHelper.copyBinary(other.key_alias); +; } if (other.isSetCompaction_strategy()) { this.compaction_strategy = other.compaction_strategy; @@ -575,10 +582,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav setBloom_filter_fp_chanceIsSet(false); this.bloom_filter_fp_chance = 0.0; this.caching = "keys_only"; + this.dclocal_read_repair_chance = 0; + setPopulate_io_cache_on_flushIsSet(false); this.populate_io_cache_on_flush = false; - setRow_cache_sizeIsSet(false); this.row_cache_size = 0.0; setKey_cache_sizeIsSet(false); @@ -1154,16 +1162,6 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return this; } - public boolean getPopulate_io_cache_on_flush() { - return this.populate_io_cache_on_flush; - } - - public CfDef setPopulate_io_cache_on_flush(boolean populate_io_cache_on_flush) { - this.populate_io_cache_on_flush = populate_io_cache_on_flush; - setPopulate_io_cache_on_flushIsSet(true); - return this; - } - public void unsetDclocal_read_repair_chance() { __isset_bit_vector.clear(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID); } @@ -1177,17 +1175,27 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav __isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value); } + public boolean isPopulate_io_cache_on_flush() { + return this.populate_io_cache_on_flush; + } + + public CfDef setPopulate_io_cache_on_flush(boolean populate_io_cache_on_flush) { + this.populate_io_cache_on_flush = populate_io_cache_on_flush; + setPopulate_io_cache_on_flushIsSet(true); + return this; + } + public void unsetPopulate_io_cache_on_flush() { - __isset_bit_vector.clear(__POPULATE_IO_CACHE_ISSET_ID); + __isset_bit_vector.clear(__POPULATE_IO_CACHE_ON_FLUSH_ISSET_ID); } /** Returns true if field populate_io_cache_on_flush is set (has been assigned a value) and false otherwise */ public boolean isSetPopulate_io_cache_on_flush() { - return __isset_bit_vector.get(__POPULATE_IO_CACHE_ISSET_ID); + return __isset_bit_vector.get(__POPULATE_IO_CACHE_ON_FLUSH_ISSET_ID); } public void setPopulate_io_cache_on_flushIsSet(boolean value) { - __isset_bit_vector.set(__POPULATE_IO_CACHE_ISSET_ID, value); + __isset_bit_vector.set(__POPULATE_IO_CACHE_ON_FLUSH_ISSET_ID, value); } /** @@ -1663,7 +1671,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav if (value == null) { unsetPopulate_io_cache_on_flush(); } else { - setPopulate_io_cache_on_flush((Boolean) value); + setPopulate_io_cache_on_flush((Boolean)value); } break; @@ -1819,7 +1827,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return Double.valueOf(getDclocal_read_repair_chance()); case POPULATE_IO_CACHE_ON_FLUSH: - return Boolean.valueOf(getPopulate_io_cache_on_flush()); + return Boolean.valueOf(isPopulate_io_cache_on_flush()); case ROW_CACHE_SIZE: return Double.valueOf(getRow_cache_size()); @@ -2146,10 +2154,10 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav boolean this_present_populate_io_cache_on_flush = true && this.isSetPopulate_io_cache_on_flush(); boolean that_present_populate_io_cache_on_flush = true && that.isSetPopulate_io_cache_on_flush(); if (this_present_populate_io_cache_on_flush || that_present_populate_io_cache_on_flush) { - if (!(this_present_populate_io_cache_on_flush && that_present_populate_io_cache_on_flush)) - return false; - if (this.populate_io_cache_on_flush != that.populate_io_cache_on_flush) - return false; + if (!(this_present_populate_io_cache_on_flush && that_present_populate_io_cache_on_flush)) + return false; + if (this.populate_io_cache_on_flush != that.populate_io_cache_on_flush) + return false; } boolean this_present_row_cache_size = true && this.isSetRow_cache_size(); @@ -2362,7 +2370,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav boolean present_populate_io_cache_on_flush = true && (isSetPopulate_io_cache_on_flush()); builder.append(present_populate_io_cache_on_flush); if (present_populate_io_cache_on_flush) - builder.append(populate_io_cache_on_flush); + builder.append(populate_io_cache_on_flush); boolean present_row_cache_size = true && (isSetRow_cache_size()); builder.append(present_row_cache_size); @@ -2973,10 +2981,10 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav if (field.type == org.apache.thrift.protocol.TType.BOOL) { this.populate_io_cache_on_flush = iprot.readBool(); setPopulate_io_cache_on_flushIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } - break; + break; case 9: // ROW_CACHE_SIZE if (field.type == org.apache.thrift.protocol.TType.DOUBLE) { this.row_cache_size = iprot.readDouble(); @@ -3479,9 +3487,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav } if (isSetPopulate_io_cache_on_flush()) { if (!first) sb.append(", "); - sb.append("populate_io_cache_on_flush:"); - sb.append(this.populate_io_cache_on_flush); - first = false; + sb.append("populate_io_cache_on_flush:"); + sb.append(this.populate_io_cache_on_flush); + first = false; } if (isSetRow_cache_size()) { if (!first) sb.append(", ");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86e949f3/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java index 1315a5b..98a7ce9 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java @@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory; public class Constants { - public static final String VERSION = "19.36.1"; + public static final String VERSION = "19.36.2"; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/86e949f3/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 8fd66ab..cca2734 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -19,12 +19,7 @@ package org.apache.cassandra.hadoop; import java.io.IOException; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -130,6 +125,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); logger.debug("partitioner is " + partitioner); + // cannonical ranges, split into pieces, fetching the splits in parallel ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<InputSplit> splits = new ArrayList<InputSplit>(); @@ -326,7 +322,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< List<TokenRange> map; try { - map = client.describe_ring(ConfigHelper.getInputKeyspace(conf)); + map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf)); } catch (InvalidRequestException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/86e949f3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c9326ae..b85f478 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -34,6 +34,7 @@ import javax.management.Notification; import javax.management.NotificationBroadcasterSupport; import javax.management.ObjectName; +import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.AtomicDouble; @@ -1080,15 +1081,61 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace) { + return getRangeToAddressMap(keyspace, tokenMetadata.sortedTokens()); + } + + public Map<Range<Token>, List<InetAddress>> getRangeToAddressMapInLocalDC(String keyspace) + { + Predicate<InetAddress> isLocalDC = new Predicate<InetAddress>() + { + public boolean apply(InetAddress address) + { + return isLocalDC(address); + } + }; + + Map<Range<Token>, List<InetAddress>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); + Map<Range<Token>, List<InetAddress>> filteredMap = Maps.newHashMap(); + for (Map.Entry<Range<Token>, List<InetAddress>> entry : origMap.entrySet()) + { + List<InetAddress> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC)); + filteredMap.put(entry.getKey(), endpointsInLocalDC); + } + + return filteredMap; + } + + private List<Token> getTokensInLocalDC() + { + List<Token> filteredTokens = Lists.newArrayList(); + for (Token token : tokenMetadata.sortedTokens()) + { + InetAddress endpoint = tokenMetadata.getEndpoint(token); + if (isLocalDC(endpoint)) + filteredTokens.add(token); + } + return filteredTokens; + } + + private boolean isLocalDC(InetAddress targetHost) + { + String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); + String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + return remoteDC.equals(localDC); + } + + private Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens) + { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system table. if (keyspace == null) keyspace = Schema.instance.getNonSystemTables().get(0); - List<Range<Token>> ranges = getAllRanges(tokenMetadata.sortedTokens()); + List<Range<Token>> ranges = getAllRanges(sortedTokens); return constructRangeToEndpointMap(keyspace, ranges); } + /** * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility * @@ -1126,13 +1173,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public List<TokenRange> describeRing(String keyspace) throws InvalidRequestException { + return describeRing(keyspace, false); + } + + /** + * The same as {@code describeRing(String)} but considers only the part of the ring formed by nodes in the local DC. + */ + public List<TokenRange> describeLocalRing(String keyspace) throws InvalidRequestException + { + return describeRing(keyspace, true); + } + + private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException + { if (keyspace == null || Table.open(keyspace).getReplicationStrategy() instanceof LocalStrategy) throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace); List<TokenRange> ranges = new ArrayList<TokenRange>(); Token.TokenFactory tf = getPartitioner().getTokenFactory(); - for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) + Map<Range<Token>, List<InetAddress>> rangeToAddressMap = + includeOnlyLocalDC + ? getRangeToAddressMapInLocalDC(keyspace) + : getRangeToAddressMap(keyspace); + + for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet()) { Range range = entry.getKey(); List<InetAddress> addresses = entry.getValue(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86e949f3/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 883ab5a..6a35285 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1183,6 +1183,19 @@ public class CassandraServer implements Cassandra.Iface } } + @Override + public List<TokenRange> describe_local_ring(String keyspace) throws InvalidRequestException, TException + { + try + { + return StorageService.instance.describeLocalRing(keyspace); + } + catch (RequestValidationException e) + { + throw ThriftConversion.toThrift(e); + } + } + public Map<String, String> describe_token_map() throws InvalidRequestException { return StorageService.instance.getTokenToEndpointMap();