This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e3b3a59c6c2832bcbd3850340f0138e876cfcb5d
Merge: 2fde9af74f ce6a65cb29
Author: Jon Meredith <jonmered...@apache.org>
AuthorDate: Fri Apr 8 14:39:37 2022 -0600

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |  1 +
 .../config/CassandraRelevantProperties.java        |  4 ++
 .../schema/DefaultSchemaUpdateHandler.java         |  3 +-
 .../cassandra/schema/MigrationCoordinator.java     | 44 ++++++++++++++++------
 .../cassandra/distributed/action/GossipHelper.java |  3 +-
 .../cassandra/distributed/impl/Instance.java       | 18 ++++++---
 .../distributed/test/MigrationCoordinatorTest.java | 12 +++---
 .../cluster/OnInstanceSyncSchemaForBootstrap.java  |  3 +-
 8 files changed, 63 insertions(+), 25 deletions(-)

diff --cc CHANGES.txt
index 8a52c9b32e,32fdd68552..d450f81726
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,138 -1,5 +1,139 @@@
 -4.0.4
 +4.1
 + * Clients using JMX are unable to handle non-standard java types but we leak 
this into our interfaces (CASSANDRA-17527)
 + * Remove stress server functionality (CASSANDRA-17535)
 + * Reduce histogram snapshot long[] allocation overhead during speculative 
read and write threshold updates (CASSANDRA-17523)
 + * Add guardrail for creation of secondary indexes (CASSANDRA-17498)
 + * Add guardrail to disallow creation of uncompressed tables (CASSANDRA-17504)
 + * Add guardrail to disallow creation of new COMPACT STORAGE tables 
(CASSANDRA-17522)
 + * repair vtables should expose a completed field due to lack of filtering 
options in CQL (CASSANDRA-17520)
 + * remove outdated code from cqlsh (CASSANDRA-17490)
 + * remove support for deprecated version specific TLS in Python 3.6 
(CASSANDRA-17365)
 + * Add support for IF EXISTS and IF NOT EXISTS in ALTER statements 
(CASSANDRA-16916)
 + * resolve several pylint issues in cqlsh.py and pylib (CASSANDRA-17480)
 + * Streaming sessions longer than 3 minutes fail with timeout 
(CASSANDRA-17510)
 + * Add ability to track state in repair (CASSANDRA-15399)
 + * Remove unused 'parse' module (CASSANDRA-17484)
 + * change six functions in cqlshlib to native Python 3 (CASSANDRA-17417)
 + * reduce hot-path object allocations required to record local/remote 
requests against the client request metrics (CASSANDRA-17424)
 + * Disallow removing DC from system_auth while nodes are active in the DC 
(CASSANDRA-17478)
 + * Add guardrail for the number of fields per UDT (CASSANDRA-17385)
 + * Allow users to change cqlsh history location using env variable 
(CASSANDRA-17448)
 + * Add required -f option to use nodetool verify and standalone sstableverify 
(CASSANDRA-17017)
 + * Add support for UUID based sstable generation identifiers (CASSANDRA-17048)
 + * Log largest memtable flush at info instead of debug (CASSANDRA-17472)
 + * Add native transport rate limiter options to example cassandra.yaml, and 
expose metric for dispatch rate (CASSANDRA-17423)
 + * Add diagnostic events for guardrails (CASSANDRA-17197)
 + * Pre hashed passwords in CQL (CASSANDRA-17334)
 + * Increase cqlsh version (CASSANDRA-17432)
 + * Update SUPPORTED_UPGRADE_PATHS to include 3.0 and 3.x to 4.1 paths and 
remove obsolete tests (CASSANDRA-17362)
 + * Support DELETE in CQLSSTableWriter (CASSANDRA-14797)
 + * Failed inbound internode authentication failures generate ugly warning 
with stack trace (CASSANDRA-17068)
 + * Expose gossip information in system_views.gossip_info virtual table 
(CASSANDRA-17002)
 + * Add guardrails for collection items and size (CASSANDRA-17153)
 + * Improve guardrails messages (CASSANDRA-17430)
 + * Remove all usages of junit.framework and ban them via Checkstyle 
(CASSANDRA-17316)
 + * Add guardrails for read/write consistency levels (CASSANDRA-17188)
 + * Add guardrail for SELECT IN terms and their cartesian product 
(CASSANDRA-17187)
 + * remove unused imports in cqlsh.py and cqlshlib (CASSANDRA-17413)
 + * deprecate property windows_timer_interval (CASSANDRA-17404)
 + * Expose streaming as a vtable (CASSANDRA-17390)
 + * Expose all client options via system_views.clients and nodetool 
clientstats (CASSANDRA-16378)
 + * Make startup checks configurable (CASSANDRA-17220)
 + * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
 + * update Python test framework from nose to pytest (CASSANDRA-17293)
 + * Fix improper CDC commit log segments deletion in non-blocking mode 
(CASSANDRA-17233)
 + * Add support for string concatenations through the + operator 
(CASSANDRA-17190)
 + * Limit the maximum hints size per host (CASSANDRA-17142)
 + * Add a virtual table for exposing batch metrics (CASSANDRA-17225)
 + * Flatten guardrails config (CASSANDRA-17353)
 + * Instance failed to start up due to NPE in 
StartupClusterConnectivityChecker (CASSANDRA-17347)
 + * add the shorter version of version flag (-v) in cqlsh (CASSANDRA-17236)
 + * Make vtables accessible via internode messaging (CASSANDRA-17295)
 + * Add support for PEM based key material for SSL (CASSANDRA-17031)
 + * Standardize storage configuration parameters' names. Support unit 
suffixes. (CASSANDRA-15234)
 + * Remove support for Windows (CASSANDRA-16956)
 + * Runtime-configurable YAML option to prohibit USE statements 
(CASSANDRA-17318)
 + * When streaming sees a ClosedChannelException this triggers the disk 
failure policy (CASSANDRA-17116)
 + * Add a virtual table for exposing prepared statements metrics 
(CASSANDRA-17224)
 + * Remove python 2.x support from cqlsh (CASSANDRA-17242)
 + * Prewarm role and credential caches to avoid timeouts at startup 
(CASSANDRA-16958)
 + * Make capacity/validity/updateinterval/activeupdate for Auth Caches 
configurable via nodetool (CASSANDRA-17063)
 + * Added startup check for read_ahead_kb setting (CASSANDRA-16436)
 + * Avoid unecessary array allocations and initializations when performing 
query checks (CASSANDRA-17209)
 + * Add guardrail for list operations that require read before write 
(CASSANDRA-17154)
 + * Migrate thresholds for number of keyspaces and tables to guardrails 
(CASSANDRA-17195)
 + * Remove self-reference in SSTableTidier (CASSANDRA-17205)
 + * Add guardrail for query page size (CASSANDRA-17189)
 + * Allow column_index_size_in_kb to be configurable through nodetool 
(CASSANDRA-17121)
 + * Emit a metric for number of local read and write calls
 + * Add non-blocking mode for CDC writes (CASSANDRA-17001)
 + * Add guardrails framework (CASSANDRA-17147)
 + * Harden resource management on SSTable components to prevent future leaks 
(CASSANDRA-17174)
 + * Make nodes more resilient to local unrelated files during startup 
(CASSANDRA-17082)
 + * repair prepare message would produce a wrong error message if network 
timeout happened rather than reply wait timeout (CASSANDRA-16992)
 + * Log queries that fail on timeout or unavailable errors up to once per 
minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error 
handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming 
(CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap 
OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes 
the JVM to exit in some case as no non-daemon threads are active 
(CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys 
(CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the 
state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node 
(CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent 
(CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution 
(CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and 
default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual 
table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values 
of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is 
thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking 
(CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at 
startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much 
data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node 
(CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations 
(CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output 
(CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation 
allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go 
past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks 
(CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for 
which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit 
(CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores 
(CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL 
(CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points (CASSANDRA-16701)
 + * Add a system property to set hostId if not yet initialized 
(CASSANDRA-14582)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version 
changes, fixed to rely on latest version (CASSANDRA-16651)
 + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 
(CASSANDRA-17040)
 +Merged from 4.0:
+  * Clean up schema migration coordinator and tests (CASSANDRA-17533)
   * Shut repair task executor down without interruption to avoid compromising 
shared channel proxies (CASSANDRA-17466)
   * Generate valid KEYSPACE / MATERIALIZED VIEW for CQL for views 
(CASSANDRA-17266)
   * Fix timestamp tz parsing (CASSANDRA-17467)
diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index b6bcebb7bf,4afb1ee9e2..3b63cf8d03
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@@ -156,10 -152,10 +156,14 @@@ public enum CassandraRelevantPropertie
       */
      GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"),
  
 +    
GOSSIPER_SKIP_WAITING_TO_SETTLE("cassandra.skip_wait_for_gossip_to_settle", 
"-1"),
 +
+     IGNORED_SCHEMA_CHECK_VERSIONS("cassandra.skip_schema_check_for_versions"),
+ 
+     
IGNORED_SCHEMA_CHECK_ENDPOINTS("cassandra.skip_schema_check_for_endpoints"),
+ 
 +    SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", 
"2000"),
 +
      /**
       * When doing a host replacement its possible that the gossip state is 
"empty" meaning that the endpoint is known
       * but the current state isn't known.  If the host replacement is needed 
to repair this state, this property must
diff --cc src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java
index 437f48918d,0000000000..1ccecc60fc
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java
+++ b/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java
@@@ -1,283 -1,0 +1,284 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.schema;
 +
 +import java.time.Duration;
 +import java.util.Collection;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.function.BiConsumer;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessagingService;
 +import 
org.apache.cassandra.schema.SchemaTransformation.SchemaTransformationResult;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 +
 +import static 
org.apache.cassandra.schema.MigrationCoordinator.MAX_OUTSTANDING_VERSION_REQUESTS;
 +
 +public class DefaultSchemaUpdateHandler implements SchemaUpdateHandler, 
IEndpointStateChangeSubscriber
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(DefaultSchemaUpdateHandler.class);
 +
 +    @VisibleForTesting
 +    final MigrationCoordinator migrationCoordinator;
 +
 +    private final boolean requireSchemas;
 +    private final BiConsumer<SchemaTransformationResult, Boolean> 
updateCallback;
 +    private volatile DistributedSchema schema = DistributedSchema.EMPTY;
 +
 +    private MigrationCoordinator createMigrationCoordinator(MessagingService 
messagingService)
 +    {
 +        return new MigrationCoordinator(messagingService,
 +                                        Stage.MIGRATION.executor(),
 +                                        ScheduledExecutors.scheduledTasks,
 +                                        MAX_OUTSTANDING_VERSION_REQUESTS,
 +                                        Gossiper.instance,
 +                                        () -> schema.getVersion(),
 +                                        (from, mutations) -> 
applyMutations(mutations));
 +    }
 +
 +    public DefaultSchemaUpdateHandler(BiConsumer<SchemaTransformationResult, 
Boolean> updateCallback)
 +    {
 +        this(null, MessagingService.instance(), 
!CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean(), 
updateCallback);
 +    }
 +
 +    public DefaultSchemaUpdateHandler(MigrationCoordinator 
migrationCoordinator,
 +                                      MessagingService messagingService,
 +                                      boolean requireSchemas,
 +                                      BiConsumer<SchemaTransformationResult, 
Boolean> updateCallback)
 +    {
 +        this.requireSchemas = requireSchemas;
 +        this.updateCallback = updateCallback;
 +        this.migrationCoordinator = migrationCoordinator == null ? 
createMigrationCoordinator(messagingService) : migrationCoordinator;
 +        Gossiper.instance.register(this);
 +        SchemaPushVerbHandler.instance.register(msg -> 
applyMutations(msg.payload));
 +        SchemaPullVerbHandler.instance.register(msg -> 
messagingService.send(msg.responseWith(getSchemaMutations()), msg.from()));
 +    }
 +
 +    public synchronized void start()
 +    {
 +        if (StorageService.instance.isReplacing())
 +            onRemove(DatabaseDescriptor.getReplaceAddress());
 +
 +        SchemaKeyspace.saveSystemKeyspacesSchema();
 +
 +        migrationCoordinator.start();
 +    }
 +
 +    @Override
 +    public boolean waitUntilReady(Duration timeout)
 +    {
 +        logger.debug("Waiting for schema to be ready (max {})", timeout);
 +        boolean schemasReceived = 
migrationCoordinator.awaitSchemaRequests(timeout.toMillis());
 +
 +        if (schemasReceived)
 +            return true;
 +
 +        logger.warn("There are nodes in the cluster with a different schema 
version than us, from which we did not merge schemas: " +
 +                    "our version: ({}), outstanding versions -> endpoints: 
{}. Use -D{}}=true to ignore this, " +
 +                    "-D{}=<ep1[,epN]> to skip specific endpoints, or 
-D{}=<ver1[,verN]> to skip specific schema versions",
 +                    Schema.instance.getVersion(),
 +                    migrationCoordinator.outstandingVersions(),
 +                    
CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey(),
-                     MigrationCoordinator.IGNORED_ENDPOINTS_PROP, 
MigrationCoordinator.IGNORED_VERSIONS_PROP);
++                    
CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS.getKey(),
++                    
CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS.getKey());
 +
 +        if (requireSchemas)
 +        {
 +            logger.error("Didn't receive schemas for all known versions 
within the {}. Use -D{}=true to skip this check.",
 +                         timeout, 
CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey());
 +
 +            return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    @Override
 +    public void onRemove(InetAddressAndPort endpoint)
 +    {
 +        migrationCoordinator.removeAndIgnoreEndpoint(endpoint);
 +    }
 +
 +    @Override
 +    public void onChange(InetAddressAndPort endpoint, ApplicationState state, 
VersionedValue value)
 +    {
 +        if (state == ApplicationState.SCHEMA)
 +        {
 +            EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +            if (epState != null && !Gossiper.instance.isDeadState(epState) && 
StorageService.instance.getTokenMetadata().isMember(endpoint))
 +            {
 +                migrationCoordinator.reportEndpointVersion(endpoint, 
UUID.fromString(value.value));
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
 +    {
 +        // no-op
 +    }
 +
 +    @Override
 +    public void beforeChange(InetAddressAndPort endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue)
 +    {
 +        // no-op
 +    }
 +
 +    @Override
 +    public void onAlive(InetAddressAndPort endpoint, EndpointState state)
 +    {
 +        // no-op
 +    }
 +
 +    @Override
 +    public void onDead(InetAddressAndPort endpoint, EndpointState state)
 +    {
 +        // no-op
 +    }
 +
 +    @Override
 +    public void onRestart(InetAddressAndPort endpoint, EndpointState state)
 +    {
 +        // no-op
 +    }
 +
 +    private synchronized SchemaTransformationResult 
applyMutations(Collection<Mutation> schemaMutations)
 +    {
 +        // fetch the current state of schema for the affected keyspaces only
 +        DistributedSchema before = schema;
 +
 +        // apply the schema mutations
 +        SchemaKeyspace.applyChanges(schemaMutations);
 +
 +        // only compare the keyspaces affected by this set of schema mutations
 +        Set<String> affectedKeyspaces = 
SchemaKeyspace.affectedKeyspaces(schemaMutations);
 +
 +        // apply the schema mutations and fetch the new versions of the 
altered keyspaces
 +        Keyspaces updatedKeyspaces = 
SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 +        Set<String> removedKeyspaces = affectedKeyspaces.stream().filter(ks 
-> !updatedKeyspaces.containsKeyspace(ks)).collect(Collectors.toSet());
 +        Keyspaces afterKeyspaces = 
before.getKeyspaces().withAddedOrReplaced(updatedKeyspaces).without(removedKeyspaces);
 +
 +        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), 
afterKeyspaces);
 +        UUID version = SchemaKeyspace.calculateSchemaDigest();
 +        DistributedSchema after = new DistributedSchema(afterKeyspaces, 
version);
 +        SchemaTransformationResult update = new 
SchemaTransformationResult(before, after, diff);
 +
 +        updateSchema(update, false);
 +        return update;
 +    }
 +
 +    @Override
 +    public synchronized SchemaTransformationResult apply(SchemaTransformation 
transformation, boolean local)
 +    {
 +        DistributedSchema before = schema;
 +        Keyspaces afterKeyspaces = 
transformation.apply(before.getKeyspaces());
 +        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), 
afterKeyspaces);
 +
 +        if (diff.isEmpty())
 +            return new SchemaTransformationResult(before, before, diff);
 +
 +        Collection<Mutation> mutations = 
SchemaKeyspace.convertSchemaDiffToMutations(diff, 
transformation.fixedTimestampMicros().orElse(FBUtilities.timestampMicros()));
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        DistributedSchema after = new DistributedSchema(afterKeyspaces, 
SchemaKeyspace.calculateSchemaDigest());
 +        SchemaTransformationResult update = new 
SchemaTransformationResult(before, after, diff);
 +
 +        updateSchema(update, local);
 +        if (!local)
 +        {
 +            migrationCoordinator.executor.submit(() -> {
 +                Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> 
endpoints = migrationCoordinator.pushSchemaMutations(mutations);
 +                
SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(endpoints.left(), 
endpoints.right(), transformation);
 +            });
 +        }
 +
 +        return update;
 +    }
 +
 +    private void updateSchema(SchemaTransformationResult update, boolean 
local)
 +    {
 +        this.schema = update.after;
 +        logger.debug("Schema updated: {}", update);
 +        updateCallback.accept(update, true);
 +        if (!local)
 +        {
 +            migrationCoordinator.announce(update.after.getVersion());
 +        }
 +    }
 +
 +    private synchronized SchemaTransformationResult reload()
 +    {
 +        DistributedSchema before = this.schema;
 +        DistributedSchema after = new 
DistributedSchema(SchemaKeyspace.fetchNonSystemKeyspaces(), 
SchemaKeyspace.calculateSchemaDigest());
 +        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), 
after.getKeyspaces());
 +        SchemaTransformationResult update = new 
SchemaTransformationResult(before, after, diff);
 +
 +        updateSchema(update, false);
 +        return update;
 +    }
 +
 +    @Override
 +    public SchemaTransformationResult reset(boolean local)
 +    {
 +        return local
 +               ? reload()
 +               : migrationCoordinator.pullSchemaFromAnyNode()
 +                                     .flatMap(mutations -> 
ImmediateFuture.success(applyMutations(mutations)))
 +                                     .awaitThrowUncheckedOnInterrupt()
 +                                     .getNow();
 +    }
 +
 +    @Override
 +    public synchronized void clear()
 +    {
 +        SchemaKeyspace.truncate();
 +        this.schema = DistributedSchema.EMPTY;
 +    }
 +
 +    private synchronized Collection<Mutation> getSchemaMutations()
 +    {
 +        return SchemaKeyspace.convertSchemaToMutations();
 +    }
 +
 +    public Map<UUID, Set<InetAddressAndPort>> getOutstandingSchemaVersions()
 +    {
 +        return migrationCoordinator.outstandingVersions();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 54da2ed783,bf3aee70fc..e3153e1e6f
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@@ -29,16 -28,14 +29,18 @@@ import java.util.HashMap
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
  import java.util.Set;
  import java.util.UUID;
 -import java.util.concurrent.Future;
 -import java.util.concurrent.FutureTask;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ScheduledFuture;
+ import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.BiConsumer;
+ import java.util.function.LongSupplier;
 +import java.util.function.Supplier;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
@@@ -62,43 -59,34 +64,50 @@@ import org.apache.cassandra.net.Messagi
  import org.apache.cassandra.net.NoPayload;
  import org.apache.cassandra.net.RequestCallback;
  import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.Simulate;
 +import org.apache.cassandra.utils.concurrent.AsyncPromise;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.ImmediateFuture;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
  
+ import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS;
+ import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS;
 -
 +import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ;
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static org.apache.cassandra.utils.Simulate.With.MONITORS;
 +import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
 +
 +/**
 + * Migration coordinator is responsible for tracking schema versions on 
various nodes and, if needed, synchronize the
 + * schema. It performs periodic checks and if there is a schema version 
mismatch between the current node and the other
 + * node, it pulls the schema and applies the changes locally through the 
callback.
 + *
 + * It works in close cooperation with {@link DefaultSchemaUpdateHandler} 
which is responsible for maintaining local
 + * schema metadata stored in {@link SchemaKeyspace}.
 + */
 +@Simulate(with = MONITORS)
  public class MigrationCoordinator
  {
      private static final Logger logger = 
LoggerFactory.getLogger(MigrationCoordinator.class);
 -    private static final Future<Void> FINISHED_FUTURE = 
Futures.immediateFuture(null);
 +    private static final Future<Void> FINISHED_FUTURE = 
ImmediateFuture.success(null);
  
+     private static LongSupplier getUptimeFn = () -> 
ManagementFactory.getRuntimeMXBean().getUptime();
+ 
+     @VisibleForTesting
+     public static void setUptimeFn(LongSupplier supplier)
+     {
+         getUptimeFn = supplier;
+     }
+ 
 -
 -    private static final int MIGRATION_DELAY_IN_MS = 60000;
 -    private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
 -
 -    public static final MigrationCoordinator instance = new 
MigrationCoordinator();
 +    private static final int MIGRATION_DELAY_IN_MS = 
CassandraRelevantProperties.MIGRATION_DELAY.getInt();
 +    public static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
  
-     public static final String IGNORED_VERSIONS_PROP = 
"cassandra.skip_schema_check_for_versions";
-     public static final String IGNORED_ENDPOINTS_PROP = 
"cassandra.skip_schema_check_for_endpoints";
- 
      private static ImmutableSet<UUID> getIgnoredVersions()
      {
-         String s = System.getProperty(IGNORED_VERSIONS_PROP);
+         String s = IGNORED_SCHEMA_CHECK_VERSIONS.getString();
          if (s == null || s.isEmpty())
              return ImmutableSet.of();
  
@@@ -332,10 -316,10 +341,10 @@@
          return true;
      }
  
 -    @VisibleForTesting
 -    protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID 
version)
 +    private boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID 
version)
      {
 -        if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < 
MIGRATION_DELAY_IN_MS)
 +        UUID localSchemaVersion = schemaVersion.get();
-         if (SchemaConstants.emptyVersion.equals(localSchemaVersion) || 
ManagementFactory.getRuntimeMXBean().getUptime() < MIGRATION_DELAY_IN_MS)
++        if (SchemaConstants.emptyVersion.equals(localSchemaVersion) || 
getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS)
          {
              // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
              logger.debug("Immediately submitting migration task for {}, " +
@@@ -424,59 -429,29 +433,72 @@@
          return task;
      }
  
 -    private static Future<?> submitToMigrationIfNotShutdown(Runnable task)
 +    private Future<Collection<Mutation>> pullSchemaFrom(InetAddressAndPort 
endpoint)
 +    {
 +        AsyncPromise<Collection<Mutation>> result = new AsyncPromise<>();
 +        return submitToMigrationIfNotShutdown(() -> pullSchema(endpoint, new 
RequestCallback<Collection<Mutation>>()
 +        {
 +            @Override
 +            public void onResponse(Message<Collection<Mutation>> msg)
 +            {
 +                result.setSuccess(msg.payload);
 +            }
 +
 +            @Override
 +            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
 +            {
 +                result.setFailure(new RuntimeException("Failed to get schema 
from " + from + ". The failure reason was: " + failureReason));
 +            }
 +
 +            @Override
 +            public boolean invokeOnFailure()
 +            {
 +                return true;
 +            }
 +        })).flatMap(ignored -> result);
 +    }
 +
 +    Future<Collection<Mutation>> pullSchemaFromAnyNode()
 +    {
 +        Optional<InetAddressAndPort> endpoint = gossiper.getLiveMembers()
 +                                                        .stream()
 +                                                        
.filter(this::shouldPullFromEndpoint)
 +                                                        .findFirst();
 +
 +        return 
endpoint.map(this::pullSchemaFrom).orElse(ImmediateFuture.success(Collections.emptyList()));
 +    }
 +
 +
 +    void announce(UUID schemaVersion)
 +    {
 +        if (gossiper.isEnabled())
 +            gossiper.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(schemaVersion));
 +        SchemaDiagnostics.versionAnnounced(Schema.instance);
 +    }
 +
-     private Future<Void> submitToMigrationIfNotShutdown(Runnable task)
++    private Future<?> submitToMigrationIfNotShutdown(Runnable task)
      {
-         if (executor.isShutdown() || executor.isTerminated())
+         boolean skipped = false;
+         try
          {
-             logger.info("Skipped scheduled pulling schema from other nodes: 
the MIGRATION executor service has been shutdown.");
 -            if (Stage.MIGRATION.executor().isShutdown() || 
Stage.MIGRATION.executor().isTerminated())
++            if (executor.isShutdown() || executor.isTerminated())
+             {
+                 skipped = true;
 -                return null;
++                return ImmediateFuture.success(null);
+             }
 -            return Stage.MIGRATION.submit(task);
++            return executor.submit(task);
+         }
+         catch (RejectedExecutionException ex)
+         {
+             skipped = true;
 -            return null;
 +            return ImmediateFuture.success(null);
          }
-         else
+         finally
          {
-             return executor.submit(task, null);
+             if (skipped)
+             {
+                 logger.info("Skipped scheduled pulling schema from other 
nodes: the MIGRATION executor service has been shutdown.");
+             }
          }
      }
  
diff --cc 
test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index fc377a65c0,a763b45fcb..d6aee7cc23
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@@ -50,7 -50,7 +50,8 @@@ import org.apache.cassandra.service.Sto
  import org.apache.cassandra.utils.FBUtilities;
  
  import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
 +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+ import static org.junit.Assert.assertTrue;
  
  public class GossipHelper
  {
@@@ -227,8 -222,8 +228,8 @@@
              pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
                  InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(pullFrom);
                  EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 -                MigrationCoordinator.instance.reportEndpointVersion(endpoint, 
state);
 -                assertTrue("schema is ready", 
MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10)));
 +                Gossiper.instance.doOnChangeNotifications(endpoint, 
ApplicationState.SCHEMA, state.getApplicationState(ApplicationState.SCHEMA));
-                 Schema.instance.waitUntilReady(Duration.ofSeconds(10));
++                assertTrue("schema is ready", 
Schema.instance.waitUntilReady(Duration.ofSeconds(10)));
              }).accept(pullFrom);
          }
      }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 90bf1fb535,b2edb4bff1..e283bb6476
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -35,8 -35,10 +35,10 @@@ import java.util.UUID
  import java.util.concurrent.CopyOnWriteArrayList;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
  import java.util.concurrent.TimeoutException;
+ import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Stream;
  import javax.management.ListenerNotFoundException;
  import javax.management.Notification;
  import javax.management.NotificationListener;
@@@ -159,25 -146,14 +162,25 @@@ public class Instance extends IsolatedE
  {
      public final IInstanceConfig config;
      private volatile boolean initialized = false;
-     private final long startedAt;
+     private final AtomicLong startedAt = new AtomicLong();
  
 -    // should never be invoked directly, so that it is instantiated on other 
class loader;
 -    // only visible for inheritance
 +    @Deprecated
      Instance(IInstanceConfig config, ClassLoader classLoader)
      {
 -        super("node" + config.num(), classLoader);
 +        this(config, classLoader, null);
 +    }
 +
 +    Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem 
fileSystem)
 +    {
 +        this(config, classLoader, fileSystem, null);
 +    }
 +
 +    Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem 
fileSystem, ShutdownExecutor shutdownExecutor)
 +    {
 +        super("node" + config.num(), classLoader, 
executorFactory().pooled("isolatedExecutor", Integer.MAX_VALUE), 
shutdownExecutor);
          this.config = config;
 +        if (fileSystem != null)
 +            File.unsafeSetFilesystem(fileSystem);
          Object clusterId = 
Objects.requireNonNull(config.get(Constants.KEY_DTEST_API_CLUSTER_ID), 
"cluster_id is not defined");
          ClusterIDDefiner.setId("cluster-" + clusterId);
          InstanceIDDefiner.setInstanceId(config.num());
@@@ -538,6 -458,8 +536,12 @@@
      @Override
      public void startup(ICluster cluster)
      {
++        // Defer initialisation of Clock.Global until cluster/instance 
identifiers are set.
++        // Otherwise, the instance classloader's logging classes are setup 
ahead of time and
++        // the patterns/file paths are not set correctly. This will be 
addressed in a subsequent
++        // commit to extend the functionality of the @Shared annotation to 
app classes.
+         assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt 
uninitialized";
+ 
          sync(() -> {
              try
              {
@@@ -631,20 -544,9 +635,21 @@@
                  
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
                  if (config.has(GOSSIP))
                  {
 -                    MigrationCoordinator.setUptimeFn(() -> 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt.get()));
 -                    StorageService.instance.initServer();
++                    MigrationCoordinator.setUptimeFn(() -> 
TimeUnit.NANOSECONDS.toMillis(nanoTime() - startedAt.get()));
 +                    try
 +                    {
 +                        StorageService.instance.initServer();
 +                    }
 +                    catch (Exception e)
 +                    {
 +                        // I am tired of looking up my notes for how to fix 
this... so why not tell the user?
 +                        Throwable cause = 
com.google.common.base.Throwables.getRootCause(e);
 +                        if (cause instanceof BindException && "Can't assign 
requested address".equals(cause.getMessage()))
 +                            throw new RuntimeException("Unable to bind, run 
the following in a termanl and try again:\nfor subnet in $(seq 0 5); do for id 
in $(seq 0 5); do sudo ifconfig lo0 alias \"127.0.$subnet.$id\"; done; done;", 
e);
 +                        throw e;
 +                    }
                      StorageService.instance.removeShutdownHook();
 +
                      Gossiper.waitToSettle();
                  }
                  else
@@@ -803,17 -755,9 +808,18 @@@
              Throwables.maybeFail(error);
          }).apply(isolatedExecutor);
  
 -        return 
CompletableFuture.runAsync(ThrowingRunnable.toRunnable(future::get), 
isolatedExecutor)
 -                                .thenRun(super::shutdown)
 -                                .thenRun(() -> startedAt.set(0L));
 +        return isolatedExecutor.submit(() -> {
 +            try
 +            {
 +                future.get();
 +                return null;
 +            }
 +            finally
 +            {
 +                super.shutdown();
++                startedAt.set(0L);
 +            }
 +        });
      }
  
      @Override
diff --cc 
test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceSyncSchemaForBootstrap.java
index bf10db2d0a,0000000000..c8bc9f741e
mode 100644,000000..100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceSyncSchemaForBootstrap.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceSyncSchemaForBootstrap.java
@@@ -1,36 -1,0 +1,37 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.simulator.cluster;
 +
 +import java.time.Duration;
 +
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.simulator.systems.SimulatedActionTask;
 +
 +import static org.apache.cassandra.simulator.Action.Modifier.DISPLAY_ORIGIN;
 +import static 
org.apache.cassandra.simulator.Action.Modifiers.RELIABLE_NO_TIMEOUTS;
++import static org.junit.Assert.assertTrue;
 +
 +class OnInstanceSyncSchemaForBootstrap extends SimulatedActionTask
 +{
 +    public OnInstanceSyncSchemaForBootstrap(ClusterActions actions, int node)
 +    {
 +        super("Sync Schema on " + node, 
RELIABLE_NO_TIMEOUTS.with(DISPLAY_ORIGIN), RELIABLE_NO_TIMEOUTS, actions, 
actions.cluster.get(node),
-               () -> Schema.instance.waitUntilReady(Duration.ofMinutes(10)));
++              () -> assertTrue("schema is ready", 
Schema.instance.waitUntilReady(Duration.ofMinutes(10))));
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to