Updated Branches: refs/heads/trunk 449573622 -> 4f2f97944
Merge branch 'cassandra-1.2' into trunk Conflicts: src/java/org/apache/cassandra/service/StorageProxy.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f2f9794 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f2f9794 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f2f9794 Branch: refs/heads/trunk Commit: 4f2f979446b8ccb071d11f81c59bf09ffdefb871 Parents: 4495736 5267112 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Jan 16 18:48:11 2013 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Jan 16 18:48:11 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 12 + .../cassandra/config/DatabaseDescriptor.java | 19 ++ .../cassandra/config/ReadRepairDecision.java | 23 ++ .../org/apache/cassandra/db/ConsistencyLevel.java | 174 ++++++++++++++- .../org/apache/cassandra/dht/AbstractBounds.java | 2 + src/java/org/apache/cassandra/dht/Bounds.java | 5 + .../org/apache/cassandra/dht/ExcludingBounds.java | 5 + .../cassandra/dht/IncludingExcludingBounds.java | 5 + src/java/org/apache/cassandra/dht/Range.java | 5 + .../cassandra/locator/AbstractEndpointSnitch.java | 24 ++ .../locator/AbstractReplicationStrategy.java | 28 ++- .../cassandra/locator/DynamicEndpointSnitch.java | 30 +++ .../apache/cassandra/locator/IEndpointSnitch.java | 8 +- .../apache/cassandra/locator/LocalStrategy.java | 1 + .../cassandra/locator/NetworkTopologyStrategy.java | 1 + .../locator/OldNetworkTopologyStrategy.java | 1 + .../apache/cassandra/locator/SimpleStrategy.java | 1 + .../service/AbstractWriteResponseHandler.java | 38 ++- .../cassandra/service/DatacenterReadCallback.java | 104 --------- .../DatacenterSyncWriteResponseHandler.java | 45 +---- .../service/DatacenterWriteResponseHandler.java | 28 +-- .../org/apache/cassandra/service/ReadCallback.java | 93 ++------- .../org/apache/cassandra/service/StorageProxy.java | 121 ++++++++--- .../apache/cassandra/service/StorageService.java | 6 +- .../cassandra/service/WriteResponseHandler.java | 45 +--- .../ReplicationStrategyEndpointCacheTest.java | 2 +- .../cassandra/locator/SimpleStrategyTest.java | 10 - 28 files changed, 481 insertions(+), 356 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 9e0557f,7c33203..818162f --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -87,9 -86,9 +87,10 @@@ public class DatabaseDescripto private static long keyCacheSizeInMB; private static IRowCacheProvider rowCacheProvider; + private static IAllocator memoryAllocator; private static String localDC; + private static Comparator<InetAddress> localComparator; /** * Inspect the classpath to find storage configuration file http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 3809580,df09171..467a382 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -321,8 -321,13 +322,13 @@@ public class StorageProxy implements St private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) { RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid)); - rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros()); + rm.delete(SystemTable.BATCHLOG_CF, FBUtilities.timestampMicros()); - AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE); + AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, + Collections.<InetAddress>emptyList(), + ConsistencyLevel.ANY, + Table.open(Table.SYSTEM_KS), + null, + WriteType.SIMPLE); updateBatchlog(rm, endpoints, handler); } @@@ -1096,10 -1120,54 +1121,53 @@@ int cql3RowCount = 0; rows = new ArrayList<Row>(); List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range); - for (AbstractBounds<RowPosition> range : ranges) + int i = 0; + AbstractBounds<RowPosition> nextRange = null; + List<InetAddress> nextEndpoints = null; + List<InetAddress> nextFilteredEndpoints = null; + while (i < ranges.size()) { + AbstractBounds<RowPosition> range = nextRange == null + ? ranges.get(i) + : nextRange; + List<InetAddress> liveEndpoints = nextEndpoints == null + ? getLiveSortedEndpoints(table, range.right) + : nextEndpoints; + List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null + ? consistency_level.filterForQuery(table, liveEndpoints) + : nextFilteredEndpoints; + ++i; + + // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take + // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges + // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. + while (i < ranges.size()) + { + nextRange = ranges.get(i); + nextEndpoints = getLiveSortedEndpoints(table, nextRange.right); + nextFilteredEndpoints = consistency_level.filterForQuery(table, liveEndpoints); + + List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints); + + // Check if there is enough endpoint for the merge to be possible. + if (!consistency_level.isSufficientLiveNodes(table, merged)) + break; + + List<InetAddress> filteredMerged = consistency_level.filterForQuery(table, merged); + + // Estimate whether merging will be a win or not + if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) + break; + + // If we get there, merge this range and the next one + range = range.withNewRight(nextRange.right); + liveEndpoints = merged; + filteredEndpoints = filteredMerged; + ++i; + } + RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace, command.column_family, - command.super_column, commandPredicate, range, command.row_filter, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/service/StorageService.java ----------------------------------------------------------------------