Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec9ce3df Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec9ce3df Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec9ce3df Branch: refs/heads/trunk Commit: ec9ce3dfba0030015c5dd846b8b5b526614cf5f7 Parents: 5484bd1 2836a64 Author: Joel Knighton <j...@apache.org> Authored: Wed Mar 22 13:20:24 2017 -0500 Committer: Joel Knighton <j...@apache.org> Committed: Wed Mar 22 13:22:43 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../gms/GossipDigestAckVerbHandler.java | 27 +++-- src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++---- .../apache/cassandra/service/MigrationTask.java | 12 +- .../cassandra/service/StorageService.java | 17 ++- test/conf/cassandra-seeds.yaml | 43 +++++++ .../apache/cassandra/gms/ShadowRoundTest.java | 116 +++++++++++++++++++ .../apache/cassandra/net/MatcherResponse.java | 24 ++-- 8 files changed, 252 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index ce8535d,9140c73..8386c20 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -37,143 -49,6 +37,144 @@@ Merged from 3.0 live rows in sstabledump (CASSANDRA-13177) * Provide user workaround when system_schema.columns does not contain entries for a table that's in system_schema.tables (CASSANDRA-13180) +Merged from 2.2: ++ * Discard in-flight shadow round responses (CASSANDRA-12653) + * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153) + * Wrong logger name in AnticompactionTask (CASSANDRA-13343) + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282) + * Fix queries updating multiple time the same list (CASSANDRA-13130) + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053) + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202) + * Fix failing COPY TO STDOUT (CASSANDRA-12497) + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222) + * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018) + * Fix negative mean latency metric (CASSANDRA-12876) + * Use only one file pointer when creating commitlog segments (CASSANDRA-12539) +Merged from 2.1: + * Remove unused repositories (CASSANDRA-13278) + * Log stacktrace of uncaught exceptions (CASSANDRA-13108) + * Use portable stderr for java error in startup (CASSANDRA-13211) + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204) + * Coalescing strategy can enter infinite loop (CASSANDRA-13159) + + +3.10 + * Fix secondary index queries regression (CASSANDRA-13013) + * Add duration type to the protocol V5 (CASSANDRA-12850) + * Fix duration type validation (CASSANDRA-13143) + * Fix flaky GcCompactionTest (CASSANDRA-12664) + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058) + * Fixed query monitoring for range queries (CASSANDRA-13050) + * Remove outboundBindAny configuration property (CASSANDRA-12673) + * Use correct bounds for all-data range when filtering (CASSANDRA-12666) + * Remove timing window in test case (CASSANDRA-12875) + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945) + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) + * Fix validation of non-frozen UDT cells (CASSANDRA-12916) + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903) + * Fix Murmur3PartitionerTest (CASSANDRA-12858) + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: * Dump threads when unit tests time out (CASSANDRA-13117) * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925) * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index 15662b1,59060f8..d6d9dfb --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@@ -54,16 -54,8 +54,10 @@@ public class GossipDigestAckVerbHandle if (Gossiper.instance.isInShadowRound()) { if (logger.isDebugEnabled()) - logger.debug("Finishing shadow round with {}", from); - Gossiper.instance.finishShadowRound(epStateMap); + logger.debug("Received an ack from {}, which may trigger exit from shadow round", from); ++ + // if the ack is completely empty, then we can infer that the respondent is also in a shadow round - Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty()); ++ Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap); return; // don't bother doing anything else, we have what we came for } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index ebfd66d,802ff9c..177d7dc --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -30,9 -30,9 +30,10 @@@ import javax.management.ObjectName import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; + import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -112,7 -114,7 +116,8 @@@ public class Gossiper implements IFailu private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>(); /* initial seeds for joining the cluster */ -- private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator); ++ @VisibleForTesting ++ final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator); /* map where key is the endpoint and value is the state associated with the endpoint */ final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>(); @@@ -126,7 -128,8 +131,10 @@@ private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>(); private volatile boolean inShadowRound = false; ++ // seeds gathered during shadow round that indicated to be in the shadow round phase as well + private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator); + // endpoint states as gathered during shadow round + private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>(); private volatile long lastProcessedMessageAt = System.currentTimeMillis(); @@@ -715,22 -720,16 +725,24 @@@ } /** - * Check if this endpoint can safely bootstrap into the cluster. + * Check if this node can safely be started and join the ring. + * If the node is bootstrapping, examines gossip state for any previous status to decide whether + * it's safe to allow this node to start and bootstrap. If not bootstrapping, compares the host ID + * that the node itself has (obtained by reading from system.local or generated if not present) + * with the host ID obtained from gossip for the endpoint address (if any). This latter case + * prevents a non-bootstrapping, new node from being started with the same address of a + * previously started, but currently down predecessor. * * @param endpoint - the endpoint to check + * @param localHostUUID - the host id to check + * @param isBootstrapping - whether the node intends to bootstrap when joining + * @param epStates - endpoint states in the cluster - * @return true if the endpoint can join the cluster + * @return true if it is safe to start the node, false otherwise */ - public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping) - public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates) ++ public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping, ++ Map<InetAddress, EndpointState> epStates) { - EndpointState epState = endpointStateMap.get(endpoint); + EndpointState epState = epStates.get(endpoint); - // if there's no previous state, or the node was previously removed from the cluster, we're good if (epState == null || isDeadState(epState)) return true; @@@ -1343,20 -1327,27 +1352,32 @@@ } /** - * Do a single 'shadow' round of gossip, where we do not modify any state - * Used when preparing to join the ring: - * * when replacing a node, to get and assume its tokens - * * when joining, to check that the local host id matches any previous id for the endpoint address + * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the + * map return value, instead of endpointStateMap. + * - * Used when preparing to join the ring: + * <ul> + * <li>when replacing a node, to get and assume its tokens</li> + * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li> + * </ul> + * + * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared - * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update ++ * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update + * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the + * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at + * the same time. + * + * @return endpoint states gathered during shadow round or empty map */ - public void doShadowRound() + public synchronized Map<InetAddress, EndpointState> doShadowRound() { buildSeedsList(); + // it may be that the local address is the only entry in the seed + // list in which case, attempting a shadow round is pointless + if (seeds.isEmpty()) - return; ++ return endpointShadowStateMap; + + seedsInShadowRound.clear(); + endpointShadowStateMap.clear(); // send a completely empty syn List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), @@@ -1401,9 -1383,11 +1422,12 @@@ { throw new RuntimeException(wtf); } + + return ImmutableMap.copyOf(endpointShadowStateMap); } -- private void buildSeedsList() ++ @VisibleForTesting ++ void buildSeedsList() { for (InetAddress seed : DatabaseDescriptor.getSeeds()) { @@@ -1521,32 -1505,12 +1545,33 @@@ return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } - protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound) - protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap) ++ protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap) { if (inShadowRound) { - endpointShadowStateMap.putAll(epStateMap); - inShadowRound = false; + if (!isInShadowRound) + { + logger.debug("Received a regular ack from {}, can now exit shadow round", respondent); + // respondent sent back a full ack, so we can exit our shadow round ++ endpointShadowStateMap.putAll(epStateMap); + inShadowRound = false; + seedsInShadowRound.clear(); + } + else + { + // respondent indicates it too is in a shadow round, if all seeds + // are in this state then we can exit our shadow round. Otherwise, + // we keep retrying the SR until one responds with a full ACK or + // we learn that all seeds are in SR. + logger.debug("Received an ack from {} indicating it is also in shadow round", respondent); + seedsInShadowRound.add(respondent); + if (seedsInShadowRound.containsAll(seeds)) + { + logger.debug("All seeds are in a shadow round, clearing this node to exit its own"); + inShadowRound = false; + seedsInShadowRound.clear(); + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/MigrationTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index b64cf13,6760040..3c0bc1a --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -492,76 -474,52 +492,75 @@@ public class StorageService extends Not daemon.deactivate(); } - public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException + private synchronized UUID prepareForReplacement() throws ConfigurationException { - logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress()); - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(); + if (SystemKeyspace.bootstrapComplete()) + throw new RuntimeException("Cannot replace address with a node that is already bootstrapped"); + + if (!joinRing) + throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); - // make magic happen + if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace")) + throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " + + "guarantees as the expected data may not be present until repair is run. " + + "To perform this operation, please restart with " + + "-Dcassandra.allow_unsafe_replace=true"); + + InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress(); + logger.info("Gathering node replacement information for {}", replaceAddress); - Gossiper.instance.doShadowRound(); + Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound(); - // now that we've gossiped at least once, we should be able to find the node we're replacing - if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null) - throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); - replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates); + // as we've completed the shadow round of gossip, we should be able to find the node we're replacing - if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null) ++ if (epStates.get(replaceAddress) == null) + throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress)); + try { - VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS); - VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); ++ VersionedValue tokensVersionedValue = epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS); if (tokensVersionedValue == null) - throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); + throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress)); - if (isReplacingSameAddress()) - { - SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc - } - return tokens; + bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); } catch (IOException e) { throw new RuntimeException(e); } + + UUID localHostId = SystemKeyspace.getLocalHostId(); + + if (isReplacingSameAddress()) + { - localHostId = Gossiper.instance.getHostId(replaceAddress); ++ localHostId = Gossiper.instance.getHostId(replaceAddress, epStates); + SystemKeyspace.setLocalHostId(localHostId); // use the replacee's host Id as our own so we receive hints, etc + } + - Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need + return localHostId; } - public synchronized void checkForEndpointCollision() throws ConfigurationException + private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException { + if (Boolean.getBoolean("cassandra.allow_unsafe_join")) + { + logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true"); + return; + } + logger.debug("Starting shadow gossip round to check for endpoint collision"); - Gossiper.instance.doShadowRound(); - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(); + Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound(); - if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates)) + // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so. + // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local + // one, which was either read from system.local or generated at startup. If a learned id is present & + // doesn't match the local, then the node needs replacing - if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap())) ++ if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates)) { throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " + "Use cassandra.replace_address if you want to replace this node.", FBUtilities.getBroadcastAddress())); } - if (useStrictConsistency && !allowSimultaneousMoves()) + + if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves()) { - for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates()) + for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet()) { // ignore local node or empty status if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/conf/cassandra-seeds.yaml ---------------------------------------------------------------------- diff --cc test/conf/cassandra-seeds.yaml index 0000000,0000000..02d25d2 new file mode 100644 --- /dev/null +++ b/test/conf/cassandra-seeds.yaml @@@ -1,0 -1,0 +1,43 @@@ ++# ++# Warning! ++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file. ++# ++cluster_name: Test Cluster ++# memtable_allocation_type: heap_buffers ++memtable_allocation_type: offheap_objects ++commitlog_sync: batch ++commitlog_sync_batch_window_in_ms: 1.0 ++commitlog_segment_size_in_mb: 5 ++commitlog_directory: build/test/cassandra/commitlog ++cdc_raw_directory: build/test/cassandra/cdc_raw ++cdc_enabled: false ++hints_directory: build/test/cassandra/hints ++partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner ++listen_address: 127.0.0.1 ++storage_port: 7010 ++start_native_transport: true ++native_transport_port: 9042 ++column_index_size_in_kb: 4 ++saved_caches_directory: build/test/cassandra/saved_caches ++data_file_directories: ++ - build/test/cassandra/data ++disk_access_mode: mmap ++seed_provider: ++ - class_name: org.apache.cassandra.locator.SimpleSeedProvider ++ parameters: ++ - seeds: "127.0.0.10,127.0.1.10,127.0.2.10" ++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch ++dynamic_snitch: true ++server_encryption_options: ++ internode_encryption: none ++ keystore: conf/.keystore ++ keystore_password: cassandra ++ truststore: conf/.truststore ++ truststore_password: cassandra ++incremental_backups: true ++concurrent_compactors: 4 ++compaction_throughput_mb_per_sec: 0 ++row_cache_class_name: org.apache.cassandra.cache.OHCProvider ++row_cache_size_in_mb: 16 ++enable_user_defined_functions: true ++enable_scripted_user_defined_functions: true http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/gms/ShadowRoundTest.java index 0000000,0000000..f8cc49c new file mode 100644 --- /dev/null +++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java @@@ -1,0 -1,0 +1,116 @@@ ++/* ++* Licensed to the Apache Software Foundation (ASF) under one ++* or more contributor license agreements. See the NOTICE file ++* distributed with this work for additional information ++* regarding copyright ownership. The ASF licenses this file ++* to you under the Apache License, Version 2.0 (the ++* "License"); you may not use this file except in compliance ++* with the License. You may obtain a copy of the License at ++* ++* http://www.apache.org/licenses/LICENSE-2.0 ++* ++* Unless required by applicable law or agreed to in writing, ++* software distributed under the License is distributed on an ++* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++* KIND, either express or implied. See the License for the ++* specific language governing permissions and limitations ++* under the License. ++*/ ++ ++package org.apache.cassandra.gms; ++ ++import java.util.Collections; ++import java.util.concurrent.atomic.AtomicBoolean; ++ ++import org.junit.After; ++import org.junit.BeforeClass; ++import org.junit.Test; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.db.Keyspace; ++import org.apache.cassandra.exceptions.ConfigurationException; ++import org.apache.cassandra.locator.IEndpointSnitch; ++import org.apache.cassandra.locator.PropertyFileSnitch; ++import org.apache.cassandra.net.MessageIn; ++import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.net.MockMessagingService; ++import org.apache.cassandra.net.MockMessagingSpy; ++import org.apache.cassandra.service.StorageService; ++ ++import static org.apache.cassandra.net.MockMessagingService.verb; ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertTrue; ++ ++public class ShadowRoundTest ++{ ++ private static final Logger logger = LoggerFactory.getLogger(ShadowRoundTest.class); ++ ++ @BeforeClass ++ public static void setUp() throws ConfigurationException ++ { ++ System.setProperty("cassandra.config", "cassandra-seeds.yaml"); ++ ++ DatabaseDescriptor.daemonInitialization(); ++ IEndpointSnitch snitch = new PropertyFileSnitch(); ++ DatabaseDescriptor.setEndpointSnitch(snitch); ++ Keyspace.setInitialized(); ++ } ++ ++ @After ++ public void cleanup() ++ { ++ MockMessagingService.cleanup(); ++ } ++ ++ @Test ++ public void testDelayedResponse() ++ { ++ Gossiper.instance.buildSeedsList(); ++ int noOfSeeds = Gossiper.instance.seeds.size(); ++ ++ final AtomicBoolean ackSend = new AtomicBoolean(false); ++ MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN)) ++ .respondN((msgOut, to) -> ++ { ++ // ACK once to finish shadow round, then busy-spin until gossiper has been enabled ++ // and then reply with remaining ACKs from other seeds ++ if (!ackSend.compareAndSet(false, true)) ++ { ++ while (!Gossiper.instance.isEnabled()) ; ++ } ++ ++ HeartBeatState hb = new HeartBeatState(123, 456); ++ EndpointState state = new EndpointState(hb); ++ GossipDigestAck payload = new GossipDigestAck( ++ Collections.singletonList(new GossipDigest(to, hb.getGeneration(), hb.getHeartBeatVersion())), ++ Collections.singletonMap(to, state)); ++ ++ logger.debug("Simulating digest ACK reply"); ++ return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version); ++ }, noOfSeeds); ++ ++ // GossipDigestAckVerbHandler will send ack2 for each ack received (after the shadow round) ++ MockMessagingSpy spyAck2 = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_ACK2)).dontReply(); ++ ++ // Migration request messages should not be emitted during shadow round ++ MockMessagingSpy spyMigrationReq = MockMessagingService.when(verb(MessagingService.Verb.MIGRATION_REQUEST)).dontReply(); ++ ++ try ++ { ++ StorageService.instance.initServer(); ++ } ++ catch (Exception e) ++ { ++ assertEquals("Unable to contact any seeds!", e.getMessage()); ++ } ++ ++ // we expect one SYN for each seed during shadow round + additional SYNs after gossiper has been enabled ++ assertTrue(spySyn.messagesIntercepted > noOfSeeds); ++ ++ // we don't expect to emit any GOSSIP_DIGEST_ACK2 or MIGRATION_REQUEST messages ++ assertEquals(0, spyAck2.messagesIntercepted); ++ assertEquals(0, spyMigrationReq.messagesIntercepted); ++ } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/net/MatcherResponse.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/net/MatcherResponse.java index 21a75c9,0000000..6cd8085 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/net/MatcherResponse.java +++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java @@@ -1,208 -1,0 +1,214 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Sends a response for an incoming message with a matching {@link Matcher}. + * The actual behavior by any instance of this class can be inspected by + * interacting with the returned {@link MockMessagingSpy}. + */ +public class MatcherResponse +{ + private final Matcher<?> matcher; + private final Set<Integer> sendResponses = new HashSet<>(); + private final MockMessagingSpy spy = new MockMessagingSpy(); + private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE); + private IMessageSink sink; + + MatcherResponse(Matcher<?> matcher) + { + this.matcher = matcher; + } + + /** + * Do not create any responses for intercepted outbound messages. + */ + public MockMessagingSpy dontReply() + { + return respond((MessageIn<?>)null); + } + + /** + * Respond with provided message in reply to each intercepted outbound message. + * @param message the message to use as mock reply from the cluster + */ + public MockMessagingSpy respond(MessageIn<?> message) + { + return respondN(message, Integer.MAX_VALUE); + } + + /** + * Respond a limited number of times with the provided message in reply to each intercepted outbound message. + * @param response the message to use as mock reply from the cluster + * @param limit number of times to respond with message + */ + public MockMessagingSpy respondN(final MessageIn<?> response, int limit) + { + return respondN((in, to) -> response, limit); + } + + /** + * Respond with the message created by the provided function that will be called with each intercepted outbound message. + * @param fnResponse function to call for creating reply based on intercepted message and target address + */ + public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse) + { + return respondN(fnResponse, Integer.MAX_VALUE); + } + + /** + * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message. + * The target address from the intercepted message will automatically be used as the created message's sender address. + * @param fnResponse function to call for creating payload object based on intercepted message and target address + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb) + { + return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE); + } + + /** + * Respond a limited number of times with message wrapping the payload object created by provided function called for + * each intercepted outbound message. The target address from the intercepted message will automatically be used as the + * created message's sender address. + * @param fnResponse function to call for creating payload object based on intercepted message and target address + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit) + { + return respondN((MessageOut<T> msg, InetAddress to) -> { + S payload = fnResponse.apply(msg); + if (payload == null) + return null; + else + return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version); + }, + limit); + } + + /** + * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed + * from the provided queue. No reply will be send when the queue has been exhausted. + * @param cannedResponses prepared payload messages to use for responses + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb) + { + return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb); + } + + /** + * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed + * from the provided queue. This method will block until queue elements are available. + * @param cannedResponses prepared payload messages to use for responses + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb) + { + return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> { + try + { + return cannedResponses.take(); + } + catch (InterruptedException e) + { + return null; + } + }, verb); + } + + /** + * Respond a limited number of times with the message created by the provided function that will be called with + * each intercepted outbound message. + * @param fnResponse function to call for creating reply based on intercepted message and target address + */ + public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit) + { + limitCounter.set(limit); + + assert sink == null: "destroy() must be called first to register new response"; + + sink = new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + // prevent outgoing message from being send in case matcher indicates a match + // and instead send the mocked response + if (matcher.matches(message, to)) + { + spy.matchingMessage(message); + + if (limitCounter.decrementAndGet() < 0) + return false; + + synchronized (sendResponses) + { + // I'm not sure about retry semantics regarding message/ID relationships, but I assume + // sending a message multiple times using the same ID shouldn't happen.. + assert !sendResponses.contains(id) : "ID re-use for outgoing message"; + sendResponses.add(id); + } - MessageIn<?> response = fnResponse.apply(message, to); - if (response != null) ++ ++ // create response asynchronously to match request/response communication execution behavior ++ new Thread(() -> + { - CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id); - if (cb != null) - cb.callback.response(response); - else - MessagingService.instance().receive(response, id); - spy.matchingResponse(response); - } ++ MessageIn<?> response = fnResponse.apply(message, to); ++ if (response != null) ++ { ++ CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id); ++ if (cb != null) ++ cb.callback.response(response); ++ else ++ MessagingService.instance().receive(response, id); ++ spy.matchingResponse(response); ++ } ++ }).start(); ++ + return false; + } + return true; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return true; + } + }; + MessagingService.instance().addMessageSink(sink); + + return spy; + } + + /** + * Stops currently registered response from being send. + */ + public void destroy() + { + MessagingService.instance().removeMessageSink(sink); + } +}