Author: jbellis Date: Tue Jun 14 15:16:04 2011 New Revision: 1135638 URL: http://svn.apache.org/viewvc?rev=1135638&view=rev Log: backport #2766 from 0.8
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1135638&r1=1135637&r2=1135638&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jun 14 15:16:04 2011 @@ -18,6 +18,7 @@ * fix nodetool ring use with Ec2Snitch (CASSANDRA-2733) * fix removing columns and subcolumns that are supressed by a row or supercolumn tombstone during replica resolution (CASSANDRA-2590) + * use threadsafe collections for StreamInSession (CASSANDRA-2766) 0.7.6 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1135638&r1=1135637&r2=1135638&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Jun 14 15:16:04 2011 @@ -24,6 +24,7 @@ import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ import org.apache.cassandra.io.sstable.S import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.cliffc.high_scale_lib.NonBlockingHashSet; /** each context gets its own StreamInSession. So there may be >1 Session per host */ public class StreamInSession @@ -43,11 +45,11 @@ public class StreamInSession private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>(); - private final List<PendingFile> files = new ArrayList<PendingFile>(); + private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>(); private final Pair<InetAddress, Long> context; private final Runnable callback; private String table; - private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>(); + private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>(); private PendingFile current; private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)