Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb4540ec Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb4540ec Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb4540ec Branch: refs/heads/cassandra-3.0 Commit: cb4540ec2775e0d034150fad19d9c570443f17d2 Parents: 2e47636 3b448b3 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Jun 28 10:17:56 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Jun 28 10:17:56 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../net/IncomingStreamingConnection.java | 7 ++-- .../apache/cassandra/net/MessagingService.java | 12 +++++- .../cassandra/streaming/ConnectionHandler.java | 39 +++++++++++++++----- .../cassandra/streaming/StreamResultFuture.java | 27 +++++++------- .../streaming/StreamingTransferTest.java | 25 ++++++++++++- 6 files changed, 81 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb4540ec/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 6b0c4f1,8d2062d..314a93e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -9,37 -2,9 +9,38 @@@ Merged from 2.2 * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) * Validate bloom_filter_fp_chance against lowest supported value when the table is created (CASSANDRA-11920) - * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) +Merged from 2.1: ++ * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) + * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) + * Avoid marking too many sstables as repaired (CASSANDRA-11696) + * Prevent select statements with clustering key > 64k (CASSANDRA-11882) + * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) + * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842) + * Cache local ranges when calculating repair neighbors (CASSANDRA-11934) + * Allow LWT operation on static column with only partition keys (CASSANDRA-10532) + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) + + +3.0.7 + * Fix legacy serialization of Thrift-generated non-compound range tombstones + when communicating with 2.x nodes (CASSANDRA-11930) + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849) + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912) + * Fix sstables not being protected from removal during index build (CASSANDRA-11905) + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032) + * Remove unneeded code to repair index summaries that have + been improperly down-sampled (CASSANDRA-11127) + * Avoid WriteTimeoutExceptions during commit log replay due to materialized + view lock contention (CASSANDRA-11891) + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530) + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb4540ec/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb4540ec/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index 835beed,8c8a333..fac46eb --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -1229,8 -1188,12 +1231,14 @@@ public final class MessagingService imp public static void validatePartitioner(AbstractBounds<?> bounds) { if (globalPartitioner() != bounds.left.getPartitioner()) - throw new AssertionError(); + throw new AssertionError(String.format("Partitioner in bounds serialization. Expected %s, was %s.", + globalPartitioner().getClass().getName(), + bounds.left.getPartitioner().getClass().getName())); } + + @VisibleForTesting + public List<SocketThread> getSocketThreads() + { + return socketThreads; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb4540ec/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index f21a89d,2b16267..7223e76 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@@ -46,11 -56,14 +46,12 @@@ import org.apache.cassandra.db.partitio import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableUtils; -import org.apache.cassandra.locator.SimpleStrategy; + import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; @@@ -289,36 -325,27 +312,36 @@@ public class StreamingTransferTes String cfname = "StandardInteger1"; Keyspace keyspace = Keyspace.open(ks); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + ClusteringComparator comparator = cfs.getComparator(); - String key = "key0"; - Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key)); - // add columns of size slightly less than column_index_size to force insert column index - rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); - rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2); - ColumnFamily cf = rm.addOrGet(cfname); - // add RangeTombstones - cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - rm.applyUnsafe(); + String key = "key1"; + + + RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key); - key = "key1"; - rm = new Mutation(ks, ByteBufferUtil.bytes(key)); // add columns of size slightly less than column_index_size to force insert column index - rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); - cf = rm.addOrGet(cfname); + updates.clustering(1) + .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64])) + .build() + .apply(); + + updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key); + updates.clustering(6) + .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()])) - .build() ++ .build() + .apply(); + // add RangeTombstones - cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - rm.applyUnsafe(); + //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key); + //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4))) + // .build() + // .apply(); + + + updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key); + updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7))) + .build() + .apply(); cfs.forceBlockingFlush();