Author: gdusbabek
Date: Fri May 14 17:07:02 2010
New Revision: 944343

URL: http://svn.apache.org/viewvc?rev=944343&view=rev
Log:
remove from streamManagers when finished. Patch by gdusbabek, reviewed by 
stuhood. CASSANDRA-1076

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=944343&r1=944342&r2=944343&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri 
May 14 17:07:02 2010
@@ -72,9 +72,8 @@ public class StreamOut
     {
         assert ranges.size() > 0;
         
-        // this is a sneaking way of indicating target as a destination node. 
it is a lame way of doing it and will 
-        // change as part of fixing CASSANDRA-1076.
-        StreamOutManager.get(target);
+        // this is so that this target shows up as a destination while 
anticompaction is happening.
+        StreamOutManager.pendingDestinations.add(target);
 
         logger.debug("Beginning transfer process to " + target + " for ranges 
" + StringUtils.join(ranges, ", "));
 
@@ -113,6 +112,7 @@ public class StreamOut
         finally
         {
             StreamingService.instance.setStatus(StreamingService.NOTHING);
+            StreamOutManager.remove(target);
         }
         if (callback != null)
             callback.run();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=944343&r1=944342&r2=944343&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java 
Fri May 14 17:07:02 2010
@@ -48,6 +48,7 @@ public class StreamOutManager
     private static Logger logger = LoggerFactory.getLogger( 
StreamOutManager.class );
         
     private static ConcurrentMap<InetAddress, StreamOutManager> streamManagers 
= new ConcurrentHashMap<InetAddress, StreamOutManager>();
+    public static final Set<InetAddress> pendingDestinations = 
Collections.synchronizedSet(new HashSet<InetAddress>());
 
     public static StreamOutManager get(InetAddress to)
     {
@@ -60,11 +61,21 @@ public class StreamOutManager
         }
         return manager;
     }
+    
+    public static void remove(InetAddress to)
+    {
+        if (streamManagers.containsKey(to) && 
streamManagers.get(to).files.size() == 0)
+            streamManagers.remove(to);
+        pendingDestinations.remove(to);
+    }
 
     public static Set<InetAddress> getDestinations()
     {
         // the results of streamManagers.keySet() isn't serializable, so 
create a new set.
-        return new HashSet(streamManagers.keySet());
+        Set<InetAddress> hosts = new HashSet<InetAddress>();
+        hosts.addAll(streamManagers.keySet());
+        hosts.addAll(pendingDestinations);
+        return hosts;
     }
 
     // we need sequential and random access to the files. hence, the map and 
the list.


Reply via email to