Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 eeaa3e012 -> 1eea31460 refs/heads/cassandra-2.1 6afab52b2 -> dcc90ef35 refs/heads/trunk 66348bbe5 -> 12f17b203
Fix race condition in StreamTransferTask that could lead to infinite loops and premature sstable deletion patch by benedict; reviewed by yukim for CASSANDRA-7704 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eeaa3e01 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eeaa3e01 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eeaa3e01 Branch: refs/heads/cassandra-2.1 Commit: eeaa3e01235c98421fecc46eaed877b207fb5a33 Parents: 8078a58 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Jan 7 19:44:00 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Jan 7 19:44:00 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/streaming/StreamTransferTask.java | 73 ++++++++++++-------- .../streaming/StreamTransferTaskTest.java | 19 +++-- 3 files changed, 62 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eeaa3e01/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1bb28c..9ccbf45 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.12: + * Fix race condition in StreamTransferTask that could lead to + infinite loops and premature sstable deletion (CASSANDRA-7704) * Add an extra version check to MigrationTask (CASSANDRA-8462) * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499) * Increase bf true positive count on key cache hit (CASSANDRA-8525) http://git-wip-us.apache.org/repos/asf/cassandra/blob/eeaa3e01/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index a543d01..5b75555 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -19,8 +19,10 @@ package org.apache.cassandra.streaming; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; @@ -30,13 +32,13 @@ import org.apache.cassandra.utils.Pair; */ public class StreamTransferTask extends StreamTask { - private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts")); private final AtomicInteger sequenceNumber = new AtomicInteger(0); + private boolean aborted = false; - private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>(); - - private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>(); + private final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); + private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>(); private long totalSize; @@ -45,7 +47,7 @@ public class StreamTransferTask extends StreamTask super(session, cfId); } - public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections) + public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections) { assert sstable != null && cfId.equals(sstable.metadata.cfId); OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections); @@ -58,31 +60,42 @@ public class StreamTransferTask extends StreamTask * * @param sequenceNumber sequence number of file */ - public synchronized void complete(int sequenceNumber) + public void complete(int sequenceNumber) { - OutgoingFileMessage file = files.remove(sequenceNumber); - if (file != null) + boolean signalComplete; + synchronized (this) { - file.sstable.releaseReference(); - // all file sent, notify session this task is complete. - if (files.isEmpty()) - { - timeoutExecutor.shutdownNow(); - session.taskCompleted(this); - } + ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber); + if (timeout != null) + timeout.cancel(false); + + OutgoingFileMessage file = files.remove(sequenceNumber); + if (file != null) + file.sstable.releaseReference(); + + signalComplete = files.isEmpty(); } + + // all file sent, notify session this task is complete. + if (signalComplete) + session.taskCompleted(this); } - public void abort() + public synchronized void abort() { + if (aborted) + return; + aborted = true; + + for (ScheduledFuture future : timeoutTasks.values()) + future.cancel(false); + timeoutTasks.clear(); + for (OutgoingFileMessage file : files.values()) - { file.sstable.releaseReference(); - } - timeoutExecutor.shutdownNow(); } - public int getTotalNumberOfFiles() + public synchronized int getTotalNumberOfFiles() { return files.size(); } @@ -92,17 +105,17 @@ public class StreamTransferTask extends StreamTask return totalSize; } - public Collection<OutgoingFileMessage> getFileMessages() + public synchronized Collection<OutgoingFileMessage> getFileMessages() { // We may race between queuing all those messages and the completion of the completion of - // the first ones. So copy the values to avoid a ConcurrentModificationException + // the first ones. So copy tthe values to avoid a ConcurrentModificationException return new ArrayList<>(files.values()); } public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) { // remove previous time out task to be rescheduled later - ScheduledFuture future = timeoutTasks.get(sequenceNumber); + ScheduledFuture future = timeoutTasks.remove(sequenceNumber); if (future != null) future.cancel(false); return files.get(sequenceNumber); @@ -120,18 +133,24 @@ public class StreamTransferTask extends StreamTask */ public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) { - if (timeoutExecutor.isShutdown()) + if (!files.containsKey(sequenceNumber)) return null; ScheduledFuture future = timeoutExecutor.schedule(new Runnable() { public void run() { - StreamTransferTask.this.complete(sequenceNumber); - timeoutTasks.remove(sequenceNumber); + synchronized (StreamTransferTask.this) + { + // remove so we don't cancel ourselves + timeoutTasks.remove(sequenceNumber); + StreamTransferTask.this.complete(sequenceNumber); + } } }, time, unit); - timeoutTasks.put(sequenceNumber, future); + + ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future); + assert prev == null; return future; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eeaa3e01/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index b51f75b..1c28cbd 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -20,11 +20,14 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.Test; +import junit.framework.Assert; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -61,19 +64,25 @@ public class StreamTransferTaskTest extends SchemaLoader { List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges)); + task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges), 0); } assertEquals(2, task.getTotalNumberOfFiles()); // if file sending completes before timeout then the task should be canceled. - ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS); - task.complete(0); - // timeout task may run after complete but it is noop + Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS); f.get(); // when timeout runs on second file, task should be completed f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS); - f.get(); + task.complete(1); + try + { + f.get(); + Assert.assertTrue(false); + } + catch (CancellationException ex) + { + } assertEquals(StreamSession.State.WAIT_COMPLETE, session.state()); // when all streaming are done, time out task should not be scheduled.