Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4a53f4d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4a53f4d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4a53f4d Branch: refs/heads/trunk Commit: e4a53f4d3160833af3ea7917a35e7e35ae02786d Parents: ab98b11 b39d984 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Wed Aug 31 20:24:03 2016 +0100 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Aug 31 20:25:24 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 5 +- .../apache/cassandra/gms/VersionedValue.java | 6 + .../apache/cassandra/locator/TokenMetadata.java | 52 ++++++- .../cassandra/service/LoadBroadcaster.java | 2 +- .../cassandra/service/StorageService.java | 136 +++++++++++++++---- 6 files changed, 173 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 4b77e4d,d7e9394..30931d3 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,50 -1,5 +1,51 @@@ -2.2.8 +3.0.9 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208) + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889) + * Fix file handle leaks due to simultaneous compaction/repair and + listing snapshots, calculating snapshot sizes, or making schema + changes (CASSANDRA-11594) + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508) + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504) + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385) + * Calculate last compacted key on startup (CASSANDRA-6216) + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190) + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436) + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331) + * Backport CASSANDRA-12002 (CASSANDRA-12177) + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100) + * Fix potential bad messaging service message for paged range reads + within mixed-version 3.x clusters (CASSANDRA-12249) + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828) + * NullPointerException during compaction on table with static columns (CASSANDRA-12336) + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) + * Fix upgrade of super columns on thrift (CASSANDRA-12335) + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359) + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277) + * Exception when computing read-repair for range tombstones (CASSANDRA-12263) + * Lost counter writes in compact table and static columns (CASSANDRA-12219) + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980) + * Add option to override compaction space check (CASSANDRA-12180) + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114) + * Respond with v1/v2 protocol header when responding to driver that attempts + to connect with too low of a protocol version (CASSANDRA-11464) + * NullPointerExpception when reading/compacting table (CASSANDRA-11988) + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315) + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733) + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) + * Fix column ordering of results with static columns for Thrift requests in + a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of + those static columns in query results (CASSANDRA-12123) + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090) + * Fix EOF exception when altering column type (CASSANDRA-11820) +Merged from 2.2: + * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523) * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522) * Fail repair on non-existing table (CASSANDRA-12279) * cqlsh copy: fix missing counter values (CASSANDRA-12476) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java index 97c5f10,b06c9c8..b50db00 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@@ -343,6 -337,43 +352,43 @@@ public class TokenMetadat } } + public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode) + { + assert replacingTokens != null && !replacingTokens.isEmpty(); + assert newNode != null && oldNode != null; + + lock.writeLock().lock(); + try + { + Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode); + if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens)) + { + throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " + + "different set of tokens %s.", newNode, oldNode, oldNodeTokens, + replacingTokens)); + } + + logger.debug("Replacing {} with {}", newNode, oldNode); + replacementToOriginal.put(newNode, oldNode); + + addBootstrapTokens(replacingTokens, newNode, oldNode); + } + finally + { + lock.writeLock().unlock(); + } + } + + public Optional<InetAddress> getReplacementNode(InetAddress endpoint) + { - return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint)); ++ return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint)); + } + + public Optional<InetAddress> getReplacingNode(InetAddress endpoint) + { - return Optional.fromNullable((replacementToOriginal.get(endpoint))); ++ return Optional.ofNullable((replacementToOriginal.get(endpoint))); + } + public void removeBootstrapTokens(Collection<Token> tokens) { assert tokens != null && !tokens.isEmpty(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 60cb86b,9197ab1..c06bed2 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -454,9 -442,12 +454,12 @@@ public class StorageService extends Not VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); if (tokensVersionedValue == null) throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); + Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); - SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc + if (isReplacingSameAddress()) + { + SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc + } Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need return tokens; } @@@ -988,9 -952,19 +996,19 @@@ } } + private void finishJoiningRing() + { + // start participating in the ring. + SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + setTokens(bootstrapTokens); + + assert tokenMetadata.sortedTokens().size() > 0; + doAuthSetup(); + } + private void doAuthSetup() { - maybeAddOrUpdateKeyspace(AuthKeyspace.definition()); + maybeAddOrUpdateKeyspace(AuthKeyspace.metadata()); DatabaseDescriptor.getRoleManager().setup(); DatabaseDescriptor.getAuthenticator().setup(); @@@ -1709,18 -1681,11 +1732,23 @@@ } } + private static String[] splitValue(VersionedValue value) + { + return value.value.split(VersionedValue.DELIMITER_STR, -1); + } + + private void updateNetVersion(InetAddress endpoint, VersionedValue value) + { + try + { + MessagingService.instance().setVersion(endpoint, Integer.valueOf(value.value)); + } + catch (NumberFormatException e) + { + throw new AssertionError("Got invalid value for NET_VERSION application state: " + value.value); + } + } + public void updateTopology(InetAddress endpoint) { if (getTokenMetadata().isMember(endpoint)) @@@ -1885,6 -1850,43 +1913,42 @@@ tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); } - + private void handleStateBootreplacing(InetAddress newNode, String[] pieces) + { + InetAddress oldNode; + try + { + oldNode = InetAddress.getByName(pieces[1]); + } + catch (Exception e) + { + logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e); + return; + } + + if (FailureDetector.instance.isAlive(oldNode)) + { + throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode)); + } + + Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode); + if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode)) + { + throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.", + newNode, replacingNode.get(), oldNode)); + } + + Collection<Token> tokens = getTokensFor(newNode); + + if (logger.isDebugEnabled()) + logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens); + + tokenMetadata.addReplaceTokens(tokens, newNode, oldNode); + PendingRangeCalculatorService.instance.update(); + + tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode); + } + /** * Handle node move to normal state. That is, node is entering token ring and participating * in reads.