Author: gdusbabek Date: Fri May 14 17:07:02 2010 New Revision: 944343 URL: http://svn.apache.org/viewvc?rev=944343&view=rev Log: remove from streamManagers when finished. Patch by gdusbabek, reviewed by stuhood. CASSANDRA-1076
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=944343&r1=944342&r2=944343&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri May 14 17:07:02 2010 @@ -72,9 +72,8 @@ public class StreamOut { assert ranges.size() > 0; - // this is a sneaking way of indicating target as a destination node. it is a lame way of doing it and will - // change as part of fixing CASSANDRA-1076. - StreamOutManager.get(target); + // this is so that this target shows up as a destination while anticompaction is happening. + StreamOutManager.pendingDestinations.add(target); logger.debug("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges, ", ")); @@ -113,6 +112,7 @@ public class StreamOut finally { StreamingService.instance.setStatus(StreamingService.NOTHING); + StreamOutManager.remove(target); } if (callback != null) callback.run(); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=944343&r1=944342&r2=944343&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Fri May 14 17:07:02 2010 @@ -48,6 +48,7 @@ public class StreamOutManager private static Logger logger = LoggerFactory.getLogger( StreamOutManager.class ); private static ConcurrentMap<InetAddress, StreamOutManager> streamManagers = new ConcurrentHashMap<InetAddress, StreamOutManager>(); + public static final Set<InetAddress> pendingDestinations = Collections.synchronizedSet(new HashSet<InetAddress>()); public static StreamOutManager get(InetAddress to) { @@ -60,11 +61,21 @@ public class StreamOutManager } return manager; } + + public static void remove(InetAddress to) + { + if (streamManagers.containsKey(to) && streamManagers.get(to).files.size() == 0) + streamManagers.remove(to); + pendingDestinations.remove(to); + } public static Set<InetAddress> getDestinations() { // the results of streamManagers.keySet() isn't serializable, so create a new set. - return new HashSet(streamManagers.keySet()); + Set<InetAddress> hosts = new HashSet<InetAddress>(); + hosts.addAll(streamManagers.keySet()); + hosts.addAll(pendingDestinations); + return hosts; } // we need sequential and random access to the files. hence, the map and the list.