This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit df1a2d4c3db4ff016d03d5403ac68778a71d5759
Merge: 76f8333 98e798f
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Jan 17 09:13:32 2022 +0100

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/repair/RepairMessageVerbHandler.java |   1 +
 .../schema/SystemDistributedKeyspace.java          |   2 +-
 .../cassandra/service/ActiveRepairService.java     |  37 ++++-
 .../distributed/test/ClearSnapshotTest.java        | 170 +++++++++++++++++++++
 5 files changed, 206 insertions(+), 5 deletions(-)

diff --cc CHANGES.txt
index c3d1e81,0896d56..e5a1a8b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,82 -1,5 +1,83 @@@
 -4.0.2
 +4.1
 + * Prewarm role and credential caches to avoid timeouts at startup 
(CASSANDRA-16958)
 + * Make capacity/validity/updateinterval/activeupdate for Auth Caches 
configurable via nodetool (CASSANDRA-17063)
 + * Added startup check for read_ahead_kb setting (CASSANDRA-16436)
 + * Avoid unecessary array allocations and initializations when performing 
query checks (CASSANDRA-17209)
 + * Add guardrail for list operations that require read before write 
(CASSANDRA-17154)
 + * Migrate thresholds for number of keyspaces and tables to guardrails 
(CASSANDRA-17195)
 + * Remove self-reference in SSTableTidier (CASSANDRA-17205)
 + * Add guardrail for query page size (CASSANDRA-17189)
 + * Allow column_index_size_in_kb to be configurable through nodetool 
(CASSANDRA-17121)
 + * Emit a metric for number of local read and write calls
 + * Add non-blocking mode for CDC writes (CASSANDRA-17001)
 + * Add guardrails framework (CASSANDRA-17147)
 + * Harden resource management on SSTable components to prevent future leaks 
(CASSANDRA-17174)
 + * Make nodes more resilient to local unrelated files during startup 
(CASSANDRA-17082)
 + * repair prepare message would produce a wrong error message if network 
timeout happened rather than reply wait timeout (CASSANDRA-16992)
 + * Log queries that fail on timeout or unavailable errors up to once per 
minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error 
handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming 
(CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap 
OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes 
the JVM to exit in some case as no non-daemon threads are active 
(CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys 
(CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the 
state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node 
(CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent 
(CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution 
(CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and 
default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual 
table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values 
of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is 
thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking 
(CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at 
startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much 
data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node 
(CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations 
(CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output 
(CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation 
allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go 
past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks 
(CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for 
which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit 
(CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores 
(CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL 
(CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points (CASSANDRA-16701)
 + * Add a system property to set hostId if not yet initialized 
(CASSANDRA-14582)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version 
changes, fixed to rely on latest version (CASSANDRA-16651)
 + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 
(CASSANDRA-17040)
 +
 +Merged from 4.0:
+  * Don't block gossip when clearing repair snapshots (CASSANDRA-17168)
   * Deduplicate warnings for deprecated parameters (changed names) 
(CASSANDRA-17160)
   * Update ant-junit to version 1.10.12 (CASSANDRA-17218)
   * Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308)
diff --cc src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index 9f17578,0000000..6206fca
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@@ -1,405 -1,0 +1,405 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.io.PrintWriter;
 +import java.io.StringWriter;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Sets;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.repair.CommonRange;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static java.lang.String.format;
 +
 +import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +
 +public final class SystemDistributedKeyspace
 +{
 +    private SystemDistributedKeyspace()
 +    {
 +    }
 +
 +    public static final String NAME = "system_distributed";
 +
 +    private static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt();
 +    private static final Logger logger = 
LoggerFactory.getLogger(SystemDistributedKeyspace.class);
 +
 +    /**
 +     * Generation is used as a timestamp for automatic table creation on 
startup.
 +     * If you make any changes to the tables below, make sure to increment the
 +     * generation and document your change here.
 +     *
 +     * gen 0: original definition in 2.2
 +     * gen 1: (pre-)add options column to parent_repair_history in 3.0, 3.11
 +     * gen 2: (pre-)add coordinator_port and participants_v2 columns to 
repair_history in 3.0, 3.11, 4.0
 +     * gen 3: gc_grace_seconds raised from 0 to 10 days in CASSANDRA-12954 in 
3.11.0
 +     * gen 4: compression chunk length reduced to 16KiB, 
memtable_flush_period_in_ms now unset on all tables in 4.0
 +     * gen 5: add ttl and TWCS to repair_history tables
 +     * gen 6: add denylist table
 +     */
 +    public static final long GENERATION = 6;
 +
 +    public static final String REPAIR_HISTORY = "repair_history";
 +
 +    public static final String PARENT_REPAIR_HISTORY = 
"parent_repair_history";
 +
 +    public static final String VIEW_BUILD_STATUS = "view_build_status";
 +
 +    public static final String PARTITION_DENYLIST_TABLE = 
"partition_denylist";
 +
 +    private static final TableMetadata RepairHistory =
 +        parse(REPAIR_HISTORY,
 +                "Repair history",
 +                "CREATE TABLE %s ("
 +                     + "keyspace_name text,"
 +                     + "columnfamily_name text,"
 +                     + "id timeuuid,"
 +                     + "parent_id timeuuid,"
 +                     + "range_begin text,"
 +                     + "range_end text,"
 +                     + "coordinator inet,"
 +                     + "coordinator_port int,"
 +                     + "participants set<inet>,"
 +                     + "participants_v2 set<text>,"
 +                     + "exception_message text,"
 +                     + "exception_stacktrace text,"
 +                     + "status text,"
 +                     + "started_at timestamp,"
 +                     + "finished_at timestamp,"
 +                     + "PRIMARY KEY ((keyspace_name, columnfamily_name), 
id))")
 +        .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30))
 +        
.compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
 +                                                          
"compaction_window_size","1")))
 +        .build();
 +
 +    private static final TableMetadata ParentRepairHistory =
 +        parse(PARENT_REPAIR_HISTORY,
 +                "Repair history",
 +                "CREATE TABLE %s ("
 +                     + "parent_id timeuuid,"
 +                     + "keyspace_name text,"
 +                     + "columnfamily_names set<text>,"
 +                     + "started_at timestamp,"
 +                     + "finished_at timestamp,"
 +                     + "exception_message text,"
 +                     + "exception_stacktrace text,"
 +                     + "requested_ranges set<text>,"
 +                     + "successful_ranges set<text>,"
 +                     + "options map<text, text>,"
 +                     + "PRIMARY KEY (parent_id))")
 +        .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30))
 +        
.compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
 +                                                          
"compaction_window_size","1")))
 +        .build();
 +
 +    private static final TableMetadata ViewBuildStatus =
 +        parse(VIEW_BUILD_STATUS,
 +            "Materialized View build status",
 +            "CREATE TABLE %s ("
 +                     + "keyspace_name text,"
 +                     + "view_name text,"
 +                     + "host_id uuid,"
 +                     + "status text,"
 +                     + "PRIMARY KEY ((keyspace_name, view_name), 
host_id))").build();
 +
 +    public static final TableMetadata PartitionDenylistTable =
 +    parse(PARTITION_DENYLIST_TABLE,
 +          "Partition keys which have been denied access",
 +          "CREATE TABLE %s ("
 +          + "ks_name text,"
 +          + "table_name text,"
 +          + "key blob,"
 +          + "PRIMARY KEY ((ks_name, table_name), key))")
 +    .build();
 +
 +    private static TableMetadata.Builder parse(String table, String 
description, String cql)
 +    {
 +        return CreateTableStatement.parse(format(cql, table), 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
 +                                   
.id(TableId.forSystemTable(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, table))
 +                                   .comment(description);
 +    }
 +
 +    public static KeyspaceMetadata metadata()
 +    {
 +        return 
KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, 
KeyspaceParams.simple(Math.max(DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, 
ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable));
 +    }
 +
 +    public static void startParentRepair(UUID parent_id, String keyspaceName, 
String[] cfnames, RepairOption options)
 +    {
 +        Collection<Range<Token>> ranges = options.getRanges();
 +        String query = "INSERT INTO %s.%s (parent_id, keyspace_name, 
columnfamily_names, requested_ranges, started_at,          options)"+
 +                                 " VALUES (%s,        '%s',          { '%s' 
},           { '%s' },          toTimestamp(now()), { %s })";
 +        String fmtQry = format(query,
 +                                      
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
 +                                      PARENT_REPAIR_HISTORY,
 +                                      parent_id.toString(),
 +                                      keyspaceName,
 +                                      Joiner.on("','").join(cfnames),
 +                                      Joiner.on("','").join(ranges),
 +                                      toCQLMap(options.asMap(), 
RepairOption.RANGES_KEY, RepairOption.COLUMNFAMILIES_KEY));
 +        processSilent(fmtQry);
 +    }
 +
 +    private static String toCQLMap(Map<String, String> options, String ... 
ignore)
 +    {
 +        Set<String> toIgnore = Sets.newHashSet(ignore);
 +        StringBuilder map = new StringBuilder();
 +        boolean first = true;
 +        for (Map.Entry<String, String> entry : options.entrySet())
 +        {
 +            if (!toIgnore.contains(entry.getKey()))
 +            {
 +                if (!first)
 +                    map.append(',');
 +                first = false;
 +                map.append(format("'%s': '%s'", entry.getKey(), 
entry.getValue()));
 +            }
 +        }
 +        return map.toString();
 +    }
 +
 +    public static void failParentRepair(UUID parent_id, Throwable t)
 +    {
 +        String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), 
exception_message=?, exception_stacktrace=? WHERE parent_id=%s";
 +
 +        StringWriter sw = new StringWriter();
 +        PrintWriter pw = new PrintWriter(sw);
 +        t.printStackTrace(pw);
 +        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, 
parent_id.toString());
 +        String message = t.getMessage();
 +        processSilent(fmtQuery, message != null ? message : "", 
sw.toString());
 +    }
 +
 +    public static void successfulParentRepair(UUID parent_id, 
Collection<Range<Token>> successfulRanges)
 +    {
 +        String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), 
successful_ranges = {'%s'} WHERE parent_id=%s";
 +        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, 
Joiner.on("','").join(successfulRanges), parent_id.toString());
 +        processSilent(fmtQuery);
 +    }
 +
 +    public static void startRepairs(UUID id, UUID parent_id, String 
keyspaceName, String[] cfnames, CommonRange commonRange)
 +    {
 +        // Don't record repair history if an upgrade is in progress as 
version 3 nodes generates errors
 +        // due to schema differences
 +        boolean includeNewColumns = 
!Gossiper.instance.hasMajorVersion3Nodes();
 +
 +        InetAddressAndPort coordinator = 
FBUtilities.getBroadcastAddressAndPort();
 +        Set<String> participants = Sets.newHashSet();
 +        Set<String> participants_v2 = Sets.newHashSet();
 +
 +        for (InetAddressAndPort endpoint : commonRange.endpoints)
 +        {
 +            participants.add(endpoint.getHostAddress(false));
 +            participants_v2.add(endpoint.getHostAddressAndPort());
 +        }
 +
 +        String query =
 +                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, 
parent_id, range_begin, range_end, coordinator, coordinator_port, participants, 
participants_v2, status, started_at) " +
 +                        "VALUES (   '%s',          '%s',              %s, %s, 
       '%s',        '%s',      '%s',        %d,               { '%s' },     { 
'%s' },        '%s',   toTimestamp(now()))";
 +        String queryWithoutNewColumns =
 +                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, 
parent_id, range_begin, range_end, coordinator, participants, status, 
started_at) " +
 +                        "VALUES (   '%s',          '%s',              %s, %s, 
       '%s',        '%s',      '%s',               { '%s' },        '%s',   
toTimestamp(now()))";
 +
 +        for (String cfname : cfnames)
 +        {
 +            for (Range<Token> range : commonRange.ranges)
 +            {
 +                String fmtQry;
 +                if (includeNewColumns)
 +                {
 +                    fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                    keyspaceName,
 +                                    cfname,
 +                                    id.toString(),
 +                                    parent_id.toString(),
 +                                    range.left.toString(),
 +                                    range.right.toString(),
 +                                    coordinator.getHostAddress(false),
 +                                    coordinator.getPort(),
 +                                    Joiner.on("', '").join(participants),
 +                                    Joiner.on("', '").join(participants_v2),
 +                                    RepairState.STARTED.toString());
 +                }
 +                else
 +                {
 +                    fmtQry = format(queryWithoutNewColumns, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                    keyspaceName,
 +                                    cfname,
 +                                    id.toString(),
 +                                    parent_id.toString(),
 +                                    range.left.toString(),
 +                                    range.right.toString(),
 +                                    coordinator.getHostAddress(false),
 +                                    Joiner.on("', '").join(participants),
 +                                    RepairState.STARTED.toString());
 +                }
 +                processSilent(fmtQry);
 +            }
 +        }
 +    }
 +
 +    public static void failRepairs(UUID id, String keyspaceName, String[] 
cfnames, Throwable t)
 +    {
 +        for (String cfname : cfnames)
 +            failedRepairJob(id, keyspaceName, cfname, t);
 +    }
 +
 +    public static void successfulRepairJob(UUID id, String keyspaceName, 
String cfname)
 +    {
 +        String query = "UPDATE %s.%s SET status = '%s', finished_at = 
toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND 
id = %s";
 +        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                        RepairState.SUCCESS.toString(),
 +                                        keyspaceName,
 +                                        cfname,
 +                                        id.toString());
 +        processSilent(fmtQuery);
 +    }
 +
 +    public static void failedRepairJob(UUID id, String keyspaceName, String 
cfname, Throwable t)
 +    {
 +        String query = "UPDATE %s.%s SET status = '%s', finished_at = 
toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE 
keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
 +        StringWriter sw = new StringWriter();
 +        PrintWriter pw = new PrintWriter(sw);
 +        t.printStackTrace(pw);
 +        String fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                      RepairState.FAILED.toString(),
 +                                      keyspaceName,
 +                                      cfname,
 +                                      id.toString());
 +        String message = t.getMessage();
 +        if (message == null)
 +            message = t.getClass().getName();
 +        processSilent(fmtQry, message, sw.toString());
 +    }
 +
 +    public static void startViewBuild(String keyspace, String view, UUID 
hostId)
 +    {
 +        String query = "INSERT INTO %s.%s (keyspace_name, view_name, host_id, 
status) VALUES (?, ?, ?, ?)";
 +        QueryProcessor.process(format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
 +                               ConsistencyLevel.ONE,
 +                               Lists.newArrayList(bytes(keyspace),
 +                                                  bytes(view),
 +                                                  bytes(hostId),
 +                                                  
bytes(BuildStatus.STARTED.toString())));
 +    }
 +
 +    public static void successfulViewBuild(String keyspace, String view, UUID 
hostId)
 +    {
 +        String query = "UPDATE %s.%s SET status = ? WHERE keyspace_name = ? 
AND view_name = ? AND host_id = ?";
 +        QueryProcessor.process(format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
 +                               ConsistencyLevel.ONE,
 +                               
Lists.newArrayList(bytes(BuildStatus.SUCCESS.toString()),
 +                                                  bytes(keyspace),
 +                                                  bytes(view),
 +                                                  bytes(hostId)));
 +    }
 +
 +    public static Map<UUID, String> viewStatus(String keyspace, String view)
 +    {
 +        String query = "SELECT host_id, status FROM %s.%s WHERE keyspace_name 
= ? AND view_name = ?";
 +        UntypedResultSet results;
 +        try
 +        {
 +            results = QueryProcessor.execute(format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
 +                                             ConsistencyLevel.ONE,
 +                                             keyspace,
 +                                             view);
 +        }
 +        catch (Exception e)
 +        {
 +            return Collections.emptyMap();
 +        }
 +
 +
 +        Map<UUID, String> status = new HashMap<>();
 +        for (UntypedResultSet.Row row : results)
 +        {
 +            status.put(row.getUUID("host_id"), row.getString("status"));
 +        }
 +        return status;
 +    }
 +
 +    public static void setViewRemoved(String keyspaceName, String viewName)
 +    {
 +        String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND 
view_name = ?";
 +        QueryProcessor.executeInternal(format(buildReq, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS), keyspaceName, 
viewName);
 +        forceBlockingFlush(VIEW_BUILD_STATUS);
 +    }
 +
 +    private static void processSilent(String fmtQry, String... values)
 +    {
 +        try
 +        {
 +            List<ByteBuffer> valueList = new ArrayList<>(values.length);
 +            for (String v : values)
 +            {
 +                valueList.add(bytes(v));
 +            }
-             QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList);
++            QueryProcessor.process(fmtQry, ConsistencyLevel.ANY, valueList);
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Error executing query "+fmtQry, t);
 +        }
 +    }
 +
 +    public static void forceBlockingFlush(String table)
 +    {
 +        if (!DatabaseDescriptor.isUnsafeSystem())
 +            
FBUtilities.waitOnFuture(Keyspace.open(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME).getColumnFamilyStore(table).forceFlush());
 +    }
 +
 +    private enum RepairState
 +    {
 +        STARTED, SUCCESS, FAILED
 +    }
 +
 +    private enum BuildStatus
 +    {
 +        UNKNOWN, STARTED, SUCCESS
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 7d0a290,f2e8b6e..cc72430
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -20,14 -20,11 +20,15 @@@ package org.apache.cassandra.service
  import java.io.IOException;
  import java.net.UnknownHostException;
  import java.util.*;
 -import java.util.concurrent.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
  import javax.management.openmbean.CompositeData;
 +import java.util.concurrent.atomic.AtomicInteger;
  import java.util.function.Predicate;
+ import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
@@@ -36,10 -33,11 +37,9 @@@ import com.google.common.cache.CacheBui
  import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Multimap;
 -import com.google.common.util.concurrent.AbstractFuture;
 -import com.google.common.util.concurrent.ListeningExecutorService;
--import com.google.common.util.concurrent.MoreExecutors;
  
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.config.Config;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.locator.EndpointsByRange;
  import org.apache.cassandra.locator.EndpointsForRange;
@@@ -99,16 -95,7 +99,17 @@@ import org.apache.cassandra.utils.concu
  
  import static com.google.common.collect.Iterables.concat;
  import static com.google.common.collect.Iterables.transform;
 +import static java.util.Collections.synchronizedSet;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +import static 
org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.reject;
 +import static org.apache.cassandra.config.DatabaseDescriptor.*;
 +import static org.apache.cassandra.net.Message.out;
  import static org.apache.cassandra.net.Verb.PREPARE_MSG;
 +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 +import static org.apache.cassandra.utils.Simulate.With.MONITORS;
++import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static 
org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
  
  /**
   * ActiveRepairService is the starting point for manual "active" repairs.
@@@ -189,6 -198,11 +190,10 @@@ public class ActiveRepairService implem
      private final Gossiper gossiper;
      private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> 
repairStatusByCmd;
  
 -    private final DebuggableThreadPoolExecutor clearSnapshotExecutor = 
DebuggableThreadPoolExecutor.createWithMaximumPoolSize("RepairClearSnapshot",
 -                                                                              
                                                1,
 -                                                                              
                                                1,
 -                                                                              
                                                TimeUnit.HOURS);
++    private final ExecutorPlus clearSnapshotExecutor = 
executorFactory().configurePooled("RepairClearSnapshot", 1)
++                                                                        
.withKeepAlive(1, TimeUnit.HOURS)
++                                                                        
.build();
+ 
      public ActiveRepairService(IFailureDetector failureDetector, Gossiper 
gossiper)
      {
          this.failureDetector = failureDetector;
@@@ -698,10 -706,22 +703,22 @@@
          ParentRepairSession session = 
parentRepairSessions.remove(parentSessionId);
          if (session == null)
              return null;
-         for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+ 
+         if (session.hasSnapshots)
          {
-             if (cfs.snapshotExists(snapshotName))
-                 cfs.clearSnapshot(snapshotName);
+             clearSnapshotExecutor.submit(() -> {
+                 logger.info("[repair #{}] Clearing snapshots for {}", 
parentSessionId,
+                             session.columnFamilyStores.values()
+                                                       .stream()
+                                                       .map(cfs -> 
cfs.metadata().toString()).collect(Collectors.joining(", ")));
 -                long startNanos = System.nanoTime();
++                long startNanos = nanoTime();
+                 for (ColumnFamilyStore cfs : 
session.columnFamilyStores.values())
+                 {
+                     if (cfs.snapshotExists(snapshotName))
+                         cfs.clearSnapshot(snapshotName);
+                 }
 -                logger.info("[repair #{}] Cleared snapshots in {}ms", 
parentSessionId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
++                logger.info("[repair #{}] Cleared snapshots in {}ms", 
parentSessionId, TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
+             });
          }
          return session;
      }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to