Author: jbellis Date: Tue Jun 14 14:33:20 2011 New Revision: 1135611 URL: http://svn.apache.org/viewvc?rev=1135611&view=rev Log: use threadsafe collections for StreamInSession patch by jbellis; reviewed by slebresne for CASSANDRA-2766
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1135611&r1=1135610&r2=1135611&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jun 14 14:33:20 2011 @@ -55,6 +55,7 @@ * avoid skipping rows in scrub for counter column family (CASSANDRA-2759) * fix ConcurrentModificationException in repair when dealing with 0.7 node (CASSANDRA-2767) + * use threadsafe collections for StreamInSession (CASSANDRA-2766) 0.8.0-final Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1135611&r1=1135610&r2=1135611&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Jun 14 14:33:20 2011 @@ -24,18 +24,20 @@ import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; -import org.apache.cassandra.gms.Gossiper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.cliffc.high_scale_lib.NonBlockingHashSet; /** each context gets its own StreamInSession. So there may be >1 Session per host */ public class StreamInSession @@ -44,11 +46,11 @@ public class StreamInSession private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>(); - private final List<PendingFile> files = new ArrayList<PendingFile>(); + private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>(); private final Pair<InetAddress, Long> context; private final Runnable callback; private String table; - private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>(); + private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>(); private PendingFile current; private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)