Release sstables of failed stream sessions only when outgoing transfers are finished
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212 Branch: refs/heads/cassandra-3.0 Commit: 03985212644112d2751cdabc72bd954dda9ff3ba Parents: f28631e Author: Yuki Morishita <yu...@apache.org> Authored: Wed Aug 3 09:34:27 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Aug 3 09:34:27 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 3 +- .../cassandra/streaming/StreamTransferTask.java | 4 +- .../streaming/messages/FileMessageHeader.java | 20 +++-- .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++- .../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++-- 6 files changed, 136 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6f709f7..87228d3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.8 + * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) * Revert CASSANDRA-11427 (CASSANDRA-12351) * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465) * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index f4c900e..294b9c1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber // stream requests to send to the peer protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet(); // streaming tasks are created and managed per ColumnFamily ID - private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); + @VisibleForTesting + protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); // data receivers, filled after receiving prepare message private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>(); private final StreamingMetrics metrics; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index f14abd2..c1c5055 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -22,6 +22,7 @@ import java.util.concurrent.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask private final AtomicInteger sequenceNumber = new AtomicInteger(0); private boolean aborted = false; - private final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); + @VisibleForTesting + protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>(); private long totalSize; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index e9a727f..b2af699 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -59,6 +59,9 @@ public class FileMessageHeader public final long repairedAt; public final int sstableLevel; + /* cached size value */ + private transient final long size; + public FileMessageHeader(UUID cfId, int sequenceNumber, String version, @@ -79,6 +82,7 @@ public class FileMessageHeader this.compressionMetadata = null; this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; + this.size = calculateSize(); } public FileMessageHeader(UUID cfId, @@ -101,6 +105,7 @@ public class FileMessageHeader this.compressionMetadata = compressionMetadata; this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; + this.size = calculateSize(); } public boolean isCompressed() @@ -113,23 +118,28 @@ public class FileMessageHeader */ public long size() { - long size = 0; + return size; + } + + private long calculateSize() + { + long transferSize = 0; if (compressionInfo != null) { // calculate total length of transferring chunks for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - size += chunk.length + 4; // 4 bytes for CRC + transferSize += chunk.length + 4; // 4 bytes for CRC } else if (compressionMetadata != null) { - size = compressionMetadata.getTotalSizeForSections(sections); + transferSize = compressionMetadata.getTotalSizeForSections(sections); } else { for (Pair<Long, Long> section : sections) - size += section.right - section.left; + transferSize += section.right - section.left; } - return size; + return transferSize; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index c8175ea..a88386e 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.util.List; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputStreamPlus; @@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { - message.serialize(out, version, session); - session.fileSent(message.header); + message.startTransfer(); + try + { + message.serialize(out, version, session); + session.fileSent(message.header); + } + finally + { + message.finishTransfer(); + } } }; @@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage private final Ref<SSTableReader> ref; private final String filename; private boolean completed = false; + private boolean transferring = false; public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel) { @@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage writer.write(out); } + @VisibleForTesting + public synchronized void finishTransfer() + { + transferring = false; + //session was aborted mid-transfer, now it's safe to release + if (completed) + { + ref.release(); + } + } + + @VisibleForTesting + public synchronized void startTransfer() + { + transferring = true; + } + public synchronized void complete() { if (!completed) { completed = true; - ref.release(); + //release only if not transferring + if (!transferring) + { + ref.release(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index c3c16b8..02af9a7 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -19,13 +19,18 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.BeforeClass; +import org.junit.After; import org.junit.Test; import junit.framework.Assert; @@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Ref; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -57,20 +64,24 @@ public class StreamTransferTaskTest SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); } + @After + public void tearDown() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); + cfs.clearUnsafe(); + } + @Test public void testScheduleTimeout() throws Exception { - String ks = KEYSPACE1; - String cf = "Standard1"; - InetAddress peer = FBUtilities.getBroadcastAddress(); StreamSession session = new StreamSession(peer, peer, null, 0, true, false); - ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables for (int i = 0; i < 2; i++) { - SchemaLoader.insertData(ks, cf, i, 1); + SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1); cfs.forceBlockingFlush(); } @@ -104,4 +115,68 @@ public class StreamTransferTaskTest // when all streaming are done, time out task should not be scheduled. assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS)); } + + @Test + public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception + { + InetAddress peer = FBUtilities.getBroadcastAddress(); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null); + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator); + StreamSession session = new StreamSession(peer, peer, null, 0, true, false); + session.init(future); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); + + // create two sstables + for (int i = 0; i < 2; i++) + { + SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1); + cfs.forceBlockingFlush(); + } + + // create streaming task that streams those two sstables + StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId); + List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size()); + for (SSTableReader sstable : cfs.getSSTables()) + { + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + Ref<SSTableReader> ref = sstable.selfRef(); + refs.add(ref); + task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0); + } + assertEquals(2, task.getTotalNumberOfFiles()); + + //add task to stream session, so it is aborted when stream session fails + session.transfers.put(UUID.randomUUID(), task); + + //make a copy of outgoing file messages, since task is cleared when it's aborted + Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values()); + + //simulate start transfer + for (OutgoingFileMessage file : files) + { + file.startTransfer(); + } + + //fail stream session mid-transfer + session.onError(new Exception("Fake exception")); + + //make sure reference was not released + for (Ref<SSTableReader> ref : refs) + { + assertEquals(1, ref.globalCount()); + } + + //simulate finish transfer + for (OutgoingFileMessage file : files) + { + file.finishTransfer(); + } + + //now reference should be released + for (Ref<SSTableReader> ref : refs) + { + assertEquals(0, ref.globalCount()); + } + } }