This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e3b3a59c6c2832bcbd3850340f0138e876cfcb5d Merge: 2fde9af74f ce6a65cb29 Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Fri Apr 8 14:39:37 2022 -0600 Merge branch 'cassandra-4.0' into trunk CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 4 ++ .../schema/DefaultSchemaUpdateHandler.java | 3 +- .../cassandra/schema/MigrationCoordinator.java | 44 ++++++++++++++++------ .../cassandra/distributed/action/GossipHelper.java | 3 +- .../cassandra/distributed/impl/Instance.java | 18 ++++++--- .../distributed/test/MigrationCoordinatorTest.java | 12 +++--- .../cluster/OnInstanceSyncSchemaForBootstrap.java | 3 +- 8 files changed, 63 insertions(+), 25 deletions(-) diff --cc CHANGES.txt index 8a52c9b32e,32fdd68552..d450f81726 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,138 -1,5 +1,139 @@@ -4.0.4 +4.1 + * Clients using JMX are unable to handle non-standard java types but we leak this into our interfaces (CASSANDRA-17527) + * Remove stress server functionality (CASSANDRA-17535) + * Reduce histogram snapshot long[] allocation overhead during speculative read and write threshold updates (CASSANDRA-17523) + * Add guardrail for creation of secondary indexes (CASSANDRA-17498) + * Add guardrail to disallow creation of uncompressed tables (CASSANDRA-17504) + * Add guardrail to disallow creation of new COMPACT STORAGE tables (CASSANDRA-17522) + * repair vtables should expose a completed field due to lack of filtering options in CQL (CASSANDRA-17520) + * remove outdated code from cqlsh (CASSANDRA-17490) + * remove support for deprecated version specific TLS in Python 3.6 (CASSANDRA-17365) + * Add support for IF EXISTS and IF NOT EXISTS in ALTER statements (CASSANDRA-16916) + * resolve several pylint issues in cqlsh.py and pylib (CASSANDRA-17480) + * Streaming sessions longer than 3 minutes fail with timeout (CASSANDRA-17510) + * Add ability to track state in repair (CASSANDRA-15399) + * Remove unused 'parse' module (CASSANDRA-17484) + * change six functions in cqlshlib to native Python 3 (CASSANDRA-17417) + * reduce hot-path object allocations required to record local/remote requests against the client request metrics (CASSANDRA-17424) + * Disallow removing DC from system_auth while nodes are active in the DC (CASSANDRA-17478) + * Add guardrail for the number of fields per UDT (CASSANDRA-17385) + * Allow users to change cqlsh history location using env variable (CASSANDRA-17448) + * Add required -f option to use nodetool verify and standalone sstableverify (CASSANDRA-17017) + * Add support for UUID based sstable generation identifiers (CASSANDRA-17048) + * Log largest memtable flush at info instead of debug (CASSANDRA-17472) + * Add native transport rate limiter options to example cassandra.yaml, and expose metric for dispatch rate (CASSANDRA-17423) + * Add diagnostic events for guardrails (CASSANDRA-17197) + * Pre hashed passwords in CQL (CASSANDRA-17334) + * Increase cqlsh version (CASSANDRA-17432) + * Update SUPPORTED_UPGRADE_PATHS to include 3.0 and 3.x to 4.1 paths and remove obsolete tests (CASSANDRA-17362) + * Support DELETE in CQLSSTableWriter (CASSANDRA-14797) + * Failed inbound internode authentication failures generate ugly warning with stack trace (CASSANDRA-17068) + * Expose gossip information in system_views.gossip_info virtual table (CASSANDRA-17002) + * Add guardrails for collection items and size (CASSANDRA-17153) + * Improve guardrails messages (CASSANDRA-17430) + * Remove all usages of junit.framework and ban them via Checkstyle (CASSANDRA-17316) + * Add guardrails for read/write consistency levels (CASSANDRA-17188) + * Add guardrail for SELECT IN terms and their cartesian product (CASSANDRA-17187) + * remove unused imports in cqlsh.py and cqlshlib (CASSANDRA-17413) + * deprecate property windows_timer_interval (CASSANDRA-17404) + * Expose streaming as a vtable (CASSANDRA-17390) + * Expose all client options via system_views.clients and nodetool clientstats (CASSANDRA-16378) + * Make startup checks configurable (CASSANDRA-17220) + * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186) + * update Python test framework from nose to pytest (CASSANDRA-17293) + * Fix improper CDC commit log segments deletion in non-blocking mode (CASSANDRA-17233) + * Add support for string concatenations through the + operator (CASSANDRA-17190) + * Limit the maximum hints size per host (CASSANDRA-17142) + * Add a virtual table for exposing batch metrics (CASSANDRA-17225) + * Flatten guardrails config (CASSANDRA-17353) + * Instance failed to start up due to NPE in StartupClusterConnectivityChecker (CASSANDRA-17347) + * add the shorter version of version flag (-v) in cqlsh (CASSANDRA-17236) + * Make vtables accessible via internode messaging (CASSANDRA-17295) + * Add support for PEM based key material for SSL (CASSANDRA-17031) + * Standardize storage configuration parameters' names. Support unit suffixes. (CASSANDRA-15234) + * Remove support for Windows (CASSANDRA-16956) + * Runtime-configurable YAML option to prohibit USE statements (CASSANDRA-17318) + * When streaming sees a ClosedChannelException this triggers the disk failure policy (CASSANDRA-17116) + * Add a virtual table for exposing prepared statements metrics (CASSANDRA-17224) + * Remove python 2.x support from cqlsh (CASSANDRA-17242) + * Prewarm role and credential caches to avoid timeouts at startup (CASSANDRA-16958) + * Make capacity/validity/updateinterval/activeupdate for Auth Caches configurable via nodetool (CASSANDRA-17063) + * Added startup check for read_ahead_kb setting (CASSANDRA-16436) + * Avoid unecessary array allocations and initializations when performing query checks (CASSANDRA-17209) + * Add guardrail for list operations that require read before write (CASSANDRA-17154) + * Migrate thresholds for number of keyspaces and tables to guardrails (CASSANDRA-17195) + * Remove self-reference in SSTableTidier (CASSANDRA-17205) + * Add guardrail for query page size (CASSANDRA-17189) + * Allow column_index_size_in_kb to be configurable through nodetool (CASSANDRA-17121) + * Emit a metric for number of local read and write calls + * Add non-blocking mode for CDC writes (CASSANDRA-17001) + * Add guardrails framework (CASSANDRA-17147) + * Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174) + * Make nodes more resilient to local unrelated files during startup (CASSANDRA-17082) + * repair prepare message would produce a wrong error message if network timeout happened rather than reply wait timeout (CASSANDRA-16992) + * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159) + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069) + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130) + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065) + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914) + * Actively update auth cache in the background (CASSANDRA-16957) + * Add unix time conversion functions (CASSANDRA-17029) + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128) + * Forbid other Future implementations with checkstyle (CASSANDRA-17055) + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085) + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106) + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054) + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023) + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309) + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030) + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027) + * Log time spent writing keys during compaction (CASSANDRA-17037) + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976) + * Add metrics and logging around index summary redistribution (CASSANDRA-17036) + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557) + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795) + * Add broadcast_rpc_address to system.local (CASSANDRA-11181) + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337) + * add credentials file support to CQLSH (CASSANDRA-16983) + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026) + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016) + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879) + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666) + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896) + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290) + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988) + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844) + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153) + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806) + * Include SASI components to snapshots (CASSANDRA-15134) + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938) + * Reduce native transport max frame size to 16MB (CASSANDRA-16886) + * Add support for filtering using IN restrictions (CASSANDRA-14344) + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404) + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850) + * Add TTL support to nodetool snapshots (CASSANDRA-16789) + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842) + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859) + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663) + * Implement nodetool getauditlog command (CASSANDRA-16725) + * Clean up repair code (CASSANDRA-13720) + * Background schedule to clean up orphaned hints files (CASSANDRA-16815) + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776) + * Batch the token metadata update to improve the speed (CASSANDRA-15291) + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775) + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760) + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671) + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510) + * Log when compacting many tombstones (CASSANDRA-16780) + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799) + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701) + * Add a system property to set hostId if not yet initialized (CASSANDRA-14582) + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651) + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 (CASSANDRA-17040) +Merged from 4.0: + * Clean up schema migration coordinator and tests (CASSANDRA-17533) * Shut repair task executor down without interruption to avoid compromising shared channel proxies (CASSANDRA-17466) * Generate valid KEYSPACE / MATERIALIZED VIEW for CQL for views (CASSANDRA-17266) * Fix timestamp tz parsing (CASSANDRA-17467) diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index b6bcebb7bf,4afb1ee9e2..3b63cf8d03 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@@ -156,10 -152,10 +156,14 @@@ public enum CassandraRelevantPropertie */ GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"), + GOSSIPER_SKIP_WAITING_TO_SETTLE("cassandra.skip_wait_for_gossip_to_settle", "-1"), + + IGNORED_SCHEMA_CHECK_VERSIONS("cassandra.skip_schema_check_for_versions"), + + IGNORED_SCHEMA_CHECK_ENDPOINTS("cassandra.skip_schema_check_for_endpoints"), + + SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"), + /** * When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known * but the current state isn't known. If the host replacement is needed to repair this state, this property must diff --cc src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java index 437f48918d,0000000000..1ccecc60fc mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java +++ b/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java @@@ -1,283 -1,0 +1,284 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.SchemaTransformation.SchemaTransformationResult; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; + +import static org.apache.cassandra.schema.MigrationCoordinator.MAX_OUTSTANDING_VERSION_REQUESTS; + +public class DefaultSchemaUpdateHandler implements SchemaUpdateHandler, IEndpointStateChangeSubscriber +{ + private static final Logger logger = LoggerFactory.getLogger(DefaultSchemaUpdateHandler.class); + + @VisibleForTesting + final MigrationCoordinator migrationCoordinator; + + private final boolean requireSchemas; + private final BiConsumer<SchemaTransformationResult, Boolean> updateCallback; + private volatile DistributedSchema schema = DistributedSchema.EMPTY; + + private MigrationCoordinator createMigrationCoordinator(MessagingService messagingService) + { + return new MigrationCoordinator(messagingService, + Stage.MIGRATION.executor(), + ScheduledExecutors.scheduledTasks, + MAX_OUTSTANDING_VERSION_REQUESTS, + Gossiper.instance, + () -> schema.getVersion(), + (from, mutations) -> applyMutations(mutations)); + } + + public DefaultSchemaUpdateHandler(BiConsumer<SchemaTransformationResult, Boolean> updateCallback) + { + this(null, MessagingService.instance(), !CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean(), updateCallback); + } + + public DefaultSchemaUpdateHandler(MigrationCoordinator migrationCoordinator, + MessagingService messagingService, + boolean requireSchemas, + BiConsumer<SchemaTransformationResult, Boolean> updateCallback) + { + this.requireSchemas = requireSchemas; + this.updateCallback = updateCallback; + this.migrationCoordinator = migrationCoordinator == null ? createMigrationCoordinator(messagingService) : migrationCoordinator; + Gossiper.instance.register(this); + SchemaPushVerbHandler.instance.register(msg -> applyMutations(msg.payload)); + SchemaPullVerbHandler.instance.register(msg -> messagingService.send(msg.responseWith(getSchemaMutations()), msg.from())); + } + + public synchronized void start() + { + if (StorageService.instance.isReplacing()) + onRemove(DatabaseDescriptor.getReplaceAddress()); + + SchemaKeyspace.saveSystemKeyspacesSchema(); + + migrationCoordinator.start(); + } + + @Override + public boolean waitUntilReady(Duration timeout) + { + logger.debug("Waiting for schema to be ready (max {})", timeout); + boolean schemasReceived = migrationCoordinator.awaitSchemaRequests(timeout.toMillis()); + + if (schemasReceived) + return true; + + logger.warn("There are nodes in the cluster with a different schema version than us, from which we did not merge schemas: " + + "our version: ({}), outstanding versions -> endpoints: {}. Use -D{}}=true to ignore this, " + + "-D{}=<ep1[,epN]> to skip specific endpoints, or -D{}=<ver1[,verN]> to skip specific schema versions", + Schema.instance.getVersion(), + migrationCoordinator.outstandingVersions(), + CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey(), - MigrationCoordinator.IGNORED_ENDPOINTS_PROP, MigrationCoordinator.IGNORED_VERSIONS_PROP); ++ CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS.getKey(), ++ CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS.getKey()); + + if (requireSchemas) + { + logger.error("Didn't receive schemas for all known versions within the {}. Use -D{}=true to skip this check.", + timeout, CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey()); + + return false; + } + + return true; + } + + @Override + public void onRemove(InetAddressAndPort endpoint) + { + migrationCoordinator.removeAndIgnoreEndpoint(endpoint); + } + + @Override + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) + { + if (state == ApplicationState.SCHEMA) + { + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (epState != null && !Gossiper.instance.isDeadState(epState) && StorageService.instance.getTokenMetadata().isMember(endpoint)) + { + migrationCoordinator.reportEndpointVersion(endpoint, UUID.fromString(value.value)); + } + } + } + + @Override + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) + { + // no-op + } + + @Override + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + { + // no-op + } + + @Override + public void onAlive(InetAddressAndPort endpoint, EndpointState state) + { + // no-op + } + + @Override + public void onDead(InetAddressAndPort endpoint, EndpointState state) + { + // no-op + } + + @Override + public void onRestart(InetAddressAndPort endpoint, EndpointState state) + { + // no-op + } + + private synchronized SchemaTransformationResult applyMutations(Collection<Mutation> schemaMutations) + { + // fetch the current state of schema for the affected keyspaces only + DistributedSchema before = schema; + + // apply the schema mutations + SchemaKeyspace.applyChanges(schemaMutations); + + // only compare the keyspaces affected by this set of schema mutations + Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(schemaMutations); + + // apply the schema mutations and fetch the new versions of the altered keyspaces + Keyspaces updatedKeyspaces = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces); + Set<String> removedKeyspaces = affectedKeyspaces.stream().filter(ks -> !updatedKeyspaces.containsKeyspace(ks)).collect(Collectors.toSet()); + Keyspaces afterKeyspaces = before.getKeyspaces().withAddedOrReplaced(updatedKeyspaces).without(removedKeyspaces); + + Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), afterKeyspaces); + UUID version = SchemaKeyspace.calculateSchemaDigest(); + DistributedSchema after = new DistributedSchema(afterKeyspaces, version); + SchemaTransformationResult update = new SchemaTransformationResult(before, after, diff); + + updateSchema(update, false); + return update; + } + + @Override + public synchronized SchemaTransformationResult apply(SchemaTransformation transformation, boolean local) + { + DistributedSchema before = schema; + Keyspaces afterKeyspaces = transformation.apply(before.getKeyspaces()); + Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), afterKeyspaces); + + if (diff.isEmpty()) + return new SchemaTransformationResult(before, before, diff); + + Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, transformation.fixedTimestampMicros().orElse(FBUtilities.timestampMicros())); + SchemaKeyspace.applyChanges(mutations); + + DistributedSchema after = new DistributedSchema(afterKeyspaces, SchemaKeyspace.calculateSchemaDigest()); + SchemaTransformationResult update = new SchemaTransformationResult(before, after, diff); + + updateSchema(update, local); + if (!local) + { + migrationCoordinator.executor.submit(() -> { + Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> endpoints = migrationCoordinator.pushSchemaMutations(mutations); + SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(endpoints.left(), endpoints.right(), transformation); + }); + } + + return update; + } + + private void updateSchema(SchemaTransformationResult update, boolean local) + { + this.schema = update.after; + logger.debug("Schema updated: {}", update); + updateCallback.accept(update, true); + if (!local) + { + migrationCoordinator.announce(update.after.getVersion()); + } + } + + private synchronized SchemaTransformationResult reload() + { + DistributedSchema before = this.schema; + DistributedSchema after = new DistributedSchema(SchemaKeyspace.fetchNonSystemKeyspaces(), SchemaKeyspace.calculateSchemaDigest()); + Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), after.getKeyspaces()); + SchemaTransformationResult update = new SchemaTransformationResult(before, after, diff); + + updateSchema(update, false); + return update; + } + + @Override + public SchemaTransformationResult reset(boolean local) + { + return local + ? reload() + : migrationCoordinator.pullSchemaFromAnyNode() + .flatMap(mutations -> ImmediateFuture.success(applyMutations(mutations))) + .awaitThrowUncheckedOnInterrupt() + .getNow(); + } + + @Override + public synchronized void clear() + { + SchemaKeyspace.truncate(); + this.schema = DistributedSchema.EMPTY; + } + + private synchronized Collection<Mutation> getSchemaMutations() + { + return SchemaKeyspace.convertSchemaToMutations(); + } + + public Map<UUID, Set<InetAddressAndPort>> getOutstandingSchemaVersions() + { + return migrationCoordinator.outstandingVersions(); + } +} diff --cc src/java/org/apache/cassandra/schema/MigrationCoordinator.java index 54da2ed783,bf3aee70fc..e3153e1e6f --- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java +++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java @@@ -29,16 -28,14 +29,18 @@@ import java.util.HashMap import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; + import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + import java.util.function.LongSupplier; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@@ -62,43 -59,34 +64,50 @@@ import org.apache.cassandra.net.Messagi import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.Simulate; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.concurrent.WaitQueue; + import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS; + import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS; - +import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; +import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; + +/** + * Migration coordinator is responsible for tracking schema versions on various nodes and, if needed, synchronize the + * schema. It performs periodic checks and if there is a schema version mismatch between the current node and the other + * node, it pulls the schema and applies the changes locally through the callback. + * + * It works in close cooperation with {@link DefaultSchemaUpdateHandler} which is responsible for maintaining local + * schema metadata stored in {@link SchemaKeyspace}. + */ +@Simulate(with = MONITORS) public class MigrationCoordinator { private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class); - private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null); + private static final Future<Void> FINISHED_FUTURE = ImmediateFuture.success(null); + private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime(); + + @VisibleForTesting + public static void setUptimeFn(LongSupplier supplier) + { + getUptimeFn = supplier; + } + - - private static final int MIGRATION_DELAY_IN_MS = 60000; - private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3; - - public static final MigrationCoordinator instance = new MigrationCoordinator(); + private static final int MIGRATION_DELAY_IN_MS = CassandraRelevantProperties.MIGRATION_DELAY.getInt(); + public static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3; - public static final String IGNORED_VERSIONS_PROP = "cassandra.skip_schema_check_for_versions"; - public static final String IGNORED_ENDPOINTS_PROP = "cassandra.skip_schema_check_for_endpoints"; - private static ImmutableSet<UUID> getIgnoredVersions() { - String s = System.getProperty(IGNORED_VERSIONS_PROP); + String s = IGNORED_SCHEMA_CHECK_VERSIONS.getString(); if (s == null || s.isEmpty()) return ImmutableSet.of(); @@@ -332,10 -316,10 +341,10 @@@ return true; } - @VisibleForTesting - protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version) + private boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version) { - if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS) + UUID localSchemaVersion = schemaVersion.get(); - if (SchemaConstants.emptyVersion.equals(localSchemaVersion) || ManagementFactory.getRuntimeMXBean().getUptime() < MIGRATION_DELAY_IN_MS) ++ if (SchemaConstants.emptyVersion.equals(localSchemaVersion) || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS) { // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately logger.debug("Immediately submitting migration task for {}, " + @@@ -424,59 -429,29 +433,72 @@@ return task; } - private static Future<?> submitToMigrationIfNotShutdown(Runnable task) + private Future<Collection<Mutation>> pullSchemaFrom(InetAddressAndPort endpoint) + { + AsyncPromise<Collection<Mutation>> result = new AsyncPromise<>(); + return submitToMigrationIfNotShutdown(() -> pullSchema(endpoint, new RequestCallback<Collection<Mutation>>() + { + @Override + public void onResponse(Message<Collection<Mutation>> msg) + { + result.setSuccess(msg.payload); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + result.setFailure(new RuntimeException("Failed to get schema from " + from + ". The failure reason was: " + failureReason)); + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + })).flatMap(ignored -> result); + } + + Future<Collection<Mutation>> pullSchemaFromAnyNode() + { + Optional<InetAddressAndPort> endpoint = gossiper.getLiveMembers() + .stream() + .filter(this::shouldPullFromEndpoint) + .findFirst(); + + return endpoint.map(this::pullSchemaFrom).orElse(ImmediateFuture.success(Collections.emptyList())); + } + + + void announce(UUID schemaVersion) + { + if (gossiper.isEnabled()) + gossiper.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(schemaVersion)); + SchemaDiagnostics.versionAnnounced(Schema.instance); + } + - private Future<Void> submitToMigrationIfNotShutdown(Runnable task) ++ private Future<?> submitToMigrationIfNotShutdown(Runnable task) { - if (executor.isShutdown() || executor.isTerminated()) + boolean skipped = false; + try { - logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown."); - if (Stage.MIGRATION.executor().isShutdown() || Stage.MIGRATION.executor().isTerminated()) ++ if (executor.isShutdown() || executor.isTerminated()) + { + skipped = true; - return null; ++ return ImmediateFuture.success(null); + } - return Stage.MIGRATION.submit(task); ++ return executor.submit(task); + } + catch (RejectedExecutionException ex) + { + skipped = true; - return null; + return ImmediateFuture.success(null); } - else + finally { - return executor.submit(task, null); + if (skipped) + { + logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown."); + } } } diff --cc test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java index fc377a65c0,a763b45fcb..d6aee7cc23 --- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java @@@ -50,7 -50,7 +50,8 @@@ import org.apache.cassandra.service.Sto import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + import static org.junit.Assert.assertTrue; public class GossipHelper { @@@ -227,8 -222,8 +228,8 @@@ pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> { InetAddressAndPort endpoint = toCassandraInetAddressAndPort(pullFrom); EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - MigrationCoordinator.instance.reportEndpointVersion(endpoint, state); - assertTrue("schema is ready", MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10))); + Gossiper.instance.doOnChangeNotifications(endpoint, ApplicationState.SCHEMA, state.getApplicationState(ApplicationState.SCHEMA)); - Schema.instance.waitUntilReady(Duration.ofSeconds(10)); ++ assertTrue("schema is ready", Schema.instance.waitUntilReady(Duration.ofSeconds(10))); }).accept(pullFrom); } } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 90bf1fb535,b2edb4bff1..e283bb6476 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -35,8 -35,10 +35,10 @@@ import java.util.UUID import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import javax.management.ListenerNotFoundException; import javax.management.Notification; import javax.management.NotificationListener; @@@ -159,25 -146,14 +162,25 @@@ public class Instance extends IsolatedE { public final IInstanceConfig config; private volatile boolean initialized = false; - private final long startedAt; + private final AtomicLong startedAt = new AtomicLong(); - // should never be invoked directly, so that it is instantiated on other class loader; - // only visible for inheritance + @Deprecated Instance(IInstanceConfig config, ClassLoader classLoader) { - super("node" + config.num(), classLoader); + this(config, classLoader, null); + } + + Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem fileSystem) + { + this(config, classLoader, fileSystem, null); + } + + Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem fileSystem, ShutdownExecutor shutdownExecutor) + { + super("node" + config.num(), classLoader, executorFactory().pooled("isolatedExecutor", Integer.MAX_VALUE), shutdownExecutor); this.config = config; + if (fileSystem != null) + File.unsafeSetFilesystem(fileSystem); Object clusterId = Objects.requireNonNull(config.get(Constants.KEY_DTEST_API_CLUSTER_ID), "cluster_id is not defined"); ClusterIDDefiner.setId("cluster-" + clusterId); InstanceIDDefiner.setInstanceId(config.num()); @@@ -538,6 -458,8 +536,12 @@@ @Override public void startup(ICluster cluster) { ++ // Defer initialisation of Clock.Global until cluster/instance identifiers are set. ++ // Otherwise, the instance classloader's logging classes are setup ahead of time and ++ // the patterns/file paths are not set correctly. This will be addressed in a subsequent ++ // commit to extend the functionality of the @Shared annotation to app classes. + assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt uninitialized"; + sync(() -> { try { @@@ -631,20 -544,9 +635,21 @@@ StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting()); if (config.has(GOSSIP)) { - MigrationCoordinator.setUptimeFn(() -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt.get())); - StorageService.instance.initServer(); ++ MigrationCoordinator.setUptimeFn(() -> TimeUnit.NANOSECONDS.toMillis(nanoTime() - startedAt.get())); + try + { + StorageService.instance.initServer(); + } + catch (Exception e) + { + // I am tired of looking up my notes for how to fix this... so why not tell the user? + Throwable cause = com.google.common.base.Throwables.getRootCause(e); + if (cause instanceof BindException && "Can't assign requested address".equals(cause.getMessage())) + throw new RuntimeException("Unable to bind, run the following in a termanl and try again:\nfor subnet in $(seq 0 5); do for id in $(seq 0 5); do sudo ifconfig lo0 alias \"127.0.$subnet.$id\"; done; done;", e); + throw e; + } StorageService.instance.removeShutdownHook(); + Gossiper.waitToSettle(); } else @@@ -803,17 -755,9 +808,18 @@@ Throwables.maybeFail(error); }).apply(isolatedExecutor); - return CompletableFuture.runAsync(ThrowingRunnable.toRunnable(future::get), isolatedExecutor) - .thenRun(super::shutdown) - .thenRun(() -> startedAt.set(0L)); + return isolatedExecutor.submit(() -> { + try + { + future.get(); + return null; + } + finally + { + super.shutdown(); ++ startedAt.set(0L); + } + }); } @Override diff --cc test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceSyncSchemaForBootstrap.java index bf10db2d0a,0000000000..c8bc9f741e mode 100644,000000..100644 --- a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceSyncSchemaForBootstrap.java +++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceSyncSchemaForBootstrap.java @@@ -1,36 -1,0 +1,37 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.simulator.cluster; + +import java.time.Duration; + +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.simulator.systems.SimulatedActionTask; + +import static org.apache.cassandra.simulator.Action.Modifier.DISPLAY_ORIGIN; +import static org.apache.cassandra.simulator.Action.Modifiers.RELIABLE_NO_TIMEOUTS; ++import static org.junit.Assert.assertTrue; + +class OnInstanceSyncSchemaForBootstrap extends SimulatedActionTask +{ + public OnInstanceSyncSchemaForBootstrap(ClusterActions actions, int node) + { + super("Sync Schema on " + node, RELIABLE_NO_TIMEOUTS.with(DISPLAY_ORIGIN), RELIABLE_NO_TIMEOUTS, actions, actions.cluster.get(node), - () -> Schema.instance.waitUntilReady(Duration.ofMinutes(10))); ++ () -> assertTrue("schema is ready", Schema.instance.waitUntilReady(Duration.ofMinutes(10)))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org