Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5 Branch: refs/heads/cassandra-3.0 Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9 Parents: 52be7ba 0398521 Author: Yuki Morishita <yu...@apache.org> Authored: Wed Aug 3 20:18:08 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Aug 3 20:18:08 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 3 +- .../cassandra/streaming/StreamTransferTask.java | 4 +- .../streaming/messages/FileMessageHeader.java | 20 +++-- .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++- .../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++-- 6 files changed, 136 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f0ceb70,87228d3..49733d3 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,32 -1,6 +1,33 @@@ -2.2.8 +3.0.9 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) + * Fix upgrade of super columns on thrift (CASSANDRA-12335) + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359) + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277) + * Exception when computing read-repair for range tombstones (CASSANDRA-12263) + * Lost counter writes in compact table and static columns (CASSANDRA-12219) + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980) + * Add option to override compaction space check (CASSANDRA-12180) + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114) + * Respond with v1/v2 protocol header when responding to driver that attempts + to connect with too low of a protocol version (CASSANDRA-11464) + * NullPointerExpception when reading/compacting table (CASSANDRA-11988) + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315) + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733) + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) + * Fix column ordering of results with static columns for Thrift requests in + a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of + those static columns in query results (CASSANDRA-12123) + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090) + * Fix EOF exception when altering column type (CASSANDRA-11820) +Merged from 2.2: + * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) - * Revert CASSANDRA-11427 (CASSANDRA-12351) * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465) * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979) * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 2b5047d,b2af699..0e06bc0 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade private final CompressionMetadata compressionMetadata; public final long repairedAt; public final int sstableLevel; + public final SerializationHeader.Component header; + /* cached size value */ + private transient final long size; + public FileMessageHeader(UUID cfId, int sequenceNumber, - String version, + Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, @@@ -84,7 -82,7 +87,8 @@@ this.compressionMetadata = null; this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; + this.header = header; + this.size = calculateSize(); } public FileMessageHeader(UUID cfId, @@@ -108,7 -105,7 +112,8 @@@ this.compressionMetadata = compressionMetadata; this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; + this.header = header; + this.size = calculateSize(); } public boolean isCompressed() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 9572552,02af9a7..dce56eb --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.BeforeClass; + import org.junit.After; import org.junit.Test; import junit.framework.Assert; @@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; + import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.concurrent.Ref; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes // when all streaming are done, time out task should not be scheduled. assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS)); } + + @Test + public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception + { + InetAddress peer = FBUtilities.getBroadcastAddress(); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null); + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator); + StreamSession session = new StreamSession(peer, peer, null, 0, true, false); + session.init(future); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); + + // create two sstables + for (int i = 0; i < 2; i++) + { + SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1); + cfs.forceBlockingFlush(); + } + + // create streaming task that streams those two sstables + StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId); + List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size()); - for (SSTableReader sstable : cfs.getSSTables()) ++ for (SSTableReader sstable : cfs.getLiveSSTables()) + { + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + Ref<SSTableReader> ref = sstable.selfRef(); + refs.add(ref); + task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0); + } + assertEquals(2, task.getTotalNumberOfFiles()); + + //add task to stream session, so it is aborted when stream session fails + session.transfers.put(UUID.randomUUID(), task); + + //make a copy of outgoing file messages, since task is cleared when it's aborted + Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values()); + + //simulate start transfer + for (OutgoingFileMessage file : files) + { + file.startTransfer(); + } + + //fail stream session mid-transfer + session.onError(new Exception("Fake exception")); + + //make sure reference was not released + for (Ref<SSTableReader> ref : refs) + { + assertEquals(1, ref.globalCount()); + } + + //simulate finish transfer + for (OutgoingFileMessage file : files) + { + file.finishTransfer(); + } + + //now reference should be released + for (Ref<SSTableReader> ref : refs) + { + assertEquals(0, ref.globalCount()); + } + } }