Author: jbellis
Date: Wed Jun 30 05:48:24 2010
New Revision: 959197

URL: http://svn.apache.org/viewvc?rev=959197&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/.rat-excludes
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/build.xml
    cassandra/trunk/debian/changelog
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-958101
+/cassandra/branches/cassandra-0.6:922689-958811
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/trunk/.rat-excludes
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/.rat-excludes?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/.rat-excludes (original)
+++ cassandra/trunk/.rat-excludes Wed Jun 30 05:48:24 2010
@@ -12,3 +12,5 @@ src/gen-java/**
 build/**
 lib/licenses/*.txt
 .settings/**
+contrib/pig/example-script.pig
+contrib/redhat/cassandra

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jun 30 05:48:24 2010
@@ -39,9 +39,13 @@ dev
  * allow multiple repair sessions per node (CASSANDRA-1190)
 
 
+0.6.4
+ * avoid queuing multiple hint deliveries for the same endpoint
+   (CASSANDRA-1229)
+
+
 0.6.3
  * retry to make streaming connections up to 8 times. (CASSANDRA-1019)
- * fix potential for duplicate rows seen by Hadoop jobs (CASSANDRA-1042)
  * reject describe_ring() calls on invalid keyspaces (CASSANDRA-1111)
  * don't reject reads at CL.ALL (CASSANDRA-1152)
  * reject deletions to supercolumns in CFs containing only standard
@@ -65,6 +69,8 @@ dev
  * remove opportunistic repairs, when two machines with overlapping replica
    responsibilities happen to finish major compactions of the same CF near
    the same time.  repairs are now fully manual (CASSANDRA-1190)
+ * add ability to lower compaction priority (default is no change from 0.6.2)
+   (CASSANDRA-1181)
 
 
 0.6.2

Modified: cassandra/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Jun 30 05:48:24 2010
@@ -45,7 +45,7 @@
     <property name="test.unit.src" value="${test.dir}/unit"/>
     <property name="test.long.src" value="${test.dir}/long"/>
     <property name="dist.dir" value="${build.dir}/dist"/>
-    <property name="version" value="0.6.2"/>
+    <property name="version" value="0.6.3"/>
     <property name="final.name" value="${ant.project.name}-${version}"/>
     <property name="ivy.version" value="2.1.0" />
     <property name="ivy.url"

Modified: cassandra/trunk/debian/changelog
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/debian/changelog?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/debian/changelog (original)
+++ cassandra/trunk/debian/changelog Wed Jun 30 05:48:24 2010
@@ -1,3 +1,9 @@
+cassandra (0.6.3) unstable; urgency=low
+
+  * New stable point release.
+
+ -- Eric Evans <eev...@apache.org>  Fri, 25 Jun 2010 17:18:54 -0500
+
 cassandra (0.6.2) unstable; urgency=low
 
   * New stable point release.

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-958811
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-958811
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-958811
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-958811
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-958811
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Wed Jun 30 05:48:24 2010
@@ -30,9 +30,9 @@ public class DebuggableThreadPoolExecuto
 {
     protected static Logger logger = 
LoggerFactory.getLogger(JMXEnabledThreadPoolExecutor.class);
 
-    public DebuggableThreadPoolExecutor(String threadPoolName)
+    public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority));
     }
 
     public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
ThreadFactory threadFactory)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
 Wed Jun 30 05:48:24 2010
@@ -30,16 +30,26 @@ import java.util.concurrent.atomic.*;
 public class NamedThreadFactory implements ThreadFactory
 {
     protected final String id;
+    private final int priority;
     protected final AtomicInteger n = new AtomicInteger(1);
 
     public NamedThreadFactory(String id)
     {
+        this(id, Thread.NORM_PRIORITY);
+    }
+
+    public NamedThreadFactory(String id, int priority)
+    {
+
         this.id = id;
+        this.priority = priority;
     }
 
     public Thread newThread(Runnable runnable)
     {        
         String name = id + ":" + n.getAndIncrement();
-        return new Thread(runnable, name);
+        Thread thread = new Thread(runnable, name);
+        thread.setPriority(priority);
+        return thread;
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed 
Jun 30 05:48:24 2010
@@ -619,7 +619,9 @@ public class CompactionManager implement
 
         public CompactionExecutor()
         {
-            super("CompactionExecutor");
+            super("CompactionExecutor", 
System.getProperty("cassandra.compaction.priority") == null
+                                        ? Thread.NORM_PRIORITY
+                                        : 
Integer.parseInt(System.getProperty("cassandra.compaction.priority")));
         }
 
         @Override

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
Wed Jun 30 05:48:24 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.net.UnknownHostException;
 import java.util.Collection;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.io.IOException;
@@ -45,6 +46,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.utils.FBUtilities;
 import static org.apache.cassandra.utils.FBUtilities.UTF8;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 
 /**
@@ -84,6 +86,8 @@ public class HintedHandOffManager
     private static final int PAGE_SIZE = 10000;
     private static final String SEPARATOR = "-";
 
+    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new 
NonBlockingHashSet<InetAddress>();
+
     private final ExecutorService executor_ = new 
JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
 
     private static boolean sendMessage(InetAddress endpoint, String tableName, 
String cfName, byte[] key) throws IOException
@@ -170,9 +174,10 @@ public class HintedHandOffManager
 
     }
             
-    private static void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
     {
         logger_.info("Started hinted handoff for endpoint " + endpoint);
+        queuedDeliveries.remove(endpoint);
 
         // 1. Get the key of the endpoint we need to handoff
         // 2. For each column read the list of rows: subcolumns are KS + 
SEPARATOR + CF
@@ -263,6 +268,9 @@ public class HintedHandOffManager
     */
     public void deliverHints(final InetAddress to)
     {
+        if (!queuedDeliveries.add(to))
+            return;
+
         Runnable r = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
 Wed Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.gms;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
 Wed Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.gms;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
 Wed Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.gms;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IVerbHandler;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed 
Jun 30 05:48:24 2010
@@ -501,22 +501,24 @@ public class StorageProxy implements Sto
         long startTime = System.nanoTime();
 
         final String table = command.keyspace;
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = 
getRestrictedRanges(command.range, command.keyspace);
 
+        List<AbstractBounds> ranges = getRestrictedRanges(command.range);
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(command.max_keys);
-        for (Pair<AbstractBounds, List<InetAddress>> pair : 
getRangeIterator(ranges, command.range.left))
+        for (AbstractBounds range : getRangeIterator(ranges, 
command.range.left))
         {
-            AbstractBounds range = pair.left;
-            List<InetAddress> endpoints = pair.right;
+            List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+            
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
 liveEndpoints);
+
             RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, 
command.column_family, command.super_column, command.predicate, range, 
command.max_keys);
             Message message = c2.getMessage();
 
             // collect replies and resolve according to consistency level
-            RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, endpoints);
+            RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
             AbstractReplicationStrategy rs = 
StorageService.instance.getReplicationStrategy(table);
             QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level, table);
-            for (InetAddress endpoint : endpoints)
+           // TODO bail early if live endpoints can't satisfy requested 
consistency level
+            for (InetAddress endpoint : liveEndpoints)
             {
                 MessagingService.instance.sendRR(message, endpoint, handler);
                 if (logger.isDebugEnabled())
@@ -621,30 +623,30 @@ public class StorageProxy implements Sto
     /**
      * returns an iterator that will return ranges in ring order, starting 
with the one that contains the start token
      */
-    private static Iterable<Pair<AbstractBounds, List<InetAddress>>> 
getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges, 
Token start)
+    private static Iterable<AbstractBounds> getRangeIterator(final 
List<AbstractBounds> ranges, Token start)
     {
         // find the one to start with
         int i;
         for (i = 0; i < ranges.size(); i++)
         {
-            AbstractBounds range = ranges.get(i).left;
+            AbstractBounds range = ranges.get(i);
             if (range.contains(start) || range.left.equals(start))
                 break;
         }
-        AbstractBounds range = ranges.get(i).left;
+        AbstractBounds range = ranges.get(i);
         assert range.contains(start) || range.left.equals(start); // make sure 
the loop didn't just end b/c ranges were exhausted
 
         // return an iterable that starts w/ the correct range and iterates 
the rest in ring order
         final int begin = i;
-        return new Iterable<Pair<AbstractBounds, List<InetAddress>>>()
+        return new Iterable<AbstractBounds>()
         {
-            public Iterator<Pair<AbstractBounds, List<InetAddress>>> iterator()
+            public Iterator<AbstractBounds> iterator()
             {
-                return new AbstractIterator<Pair<AbstractBounds, 
List<InetAddress>>>()
+                return new AbstractIterator<AbstractBounds>()
                 {
                     int n = 0;
 
-                    protected Pair<AbstractBounds, List<InetAddress>> 
computeNext()
+                    protected AbstractBounds computeNext()
                     {
                         if (n == ranges.size())
                             return endOfData();
@@ -669,30 +671,38 @@ public class StorageProxy implements Sto
      *     D, but we don't want any other results from it until after the (D, 
T] range.  Unwrapping so that
      *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
      */
-    private static List<Pair<AbstractBounds, List<InetAddress>>> 
getRestrictedRanges(AbstractBounds queryRange, String keyspace)
-    throws UnavailableException
+    private static List<AbstractBounds> getRestrictedRanges(AbstractBounds 
queryRange)
     {
         TokenMetadata tokenMetadata = 
StorageService.instance.getTokenMetadata();
-        Iterator<Token> iter = 
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left);
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = new 
ArrayList<Pair<AbstractBounds, List<InetAddress>>>();
-        while (iter.hasNext())
+
+        List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
+        // for each node, compute its intersection with the query range, and 
add its unwrapped components to our list
+        for (Token nodeToken : tokenMetadata.sortedTokens())
         {
-            Token nodeToken = iter.next();
             Range nodeRange = new 
Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
-            List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
-
-            
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
 endpoints);
-            Set<AbstractBounds> restrictedRanges = 
queryRange.restrictTo(nodeRange);
-            for (AbstractBounds range : restrictedRanges)
+            for (AbstractBounds range : queryRange.restrictTo(nodeRange))
             {
                 for (AbstractBounds unwrapped : range.unwrap())
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Adding to restricted ranges " + 
unwrapped + " for " + nodeRange);
-                    ranges.add(new Pair<AbstractBounds, 
List<InetAddress>>(unwrapped, endpoints));
+                    ranges.add(unwrapped);
                 }
             }
         }
+
+        // re-sort ranges in ring order, post-unwrapping
+        Comparator<AbstractBounds> comparator = new 
Comparator<AbstractBounds>()
+        {
+            public int compare(AbstractBounds o1, AbstractBounds o2)
+            {
+                // no restricted ranges will overlap so we don't need to worry 
about inclusive vs exclusive left,
+                // just sort by raw token position.
+                return o1.left.compareTo(o2.left);
+            }
+        };
+        Collections.sort(ranges, comparator);
+
         return ranges;
     }
     

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java 
Wed Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.io;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import org.junit.Test;
 


Reply via email to