Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d434a33a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d434a33a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d434a33a Branch: refs/heads/cassandra-2.2 Commit: d434a33ace2dfe6715f4857f9537ee884f4ef410 Parents: 73a730f a8e8a67 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Nov 17 10:07:04 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Nov 17 10:07:04 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/messages/RepairOption.java | 3 +++ .../org/apache/cassandra/service/StorageService.java | 4 ++++ .../cassandra/repair/messages/RepairOptionTest.java | 13 +++++++++++-- 4 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 5705453,b6b394a..489a76d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,5 +1,17 @@@ -2.1.12 +2.2.4 + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) +Merged from 2.1: + * Reject incremental repair with subrange repair (CASSANDRA-10422) * Add a nodetool command to refresh size_estimates (CASSANDRA-9579) * Shutdown compaction in drain to prevent leak (CASSANDRA-10079) * Invalidate cache after stream receive task is completed (CASSANDRA-10341) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/messages/RepairOption.java index f3e452c,0000000..1780b6b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@@ -1,308 -1,0 +1,311 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.tools.nodetool.Repair; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Repair options. + */ +public class RepairOption +{ + public static final String PARALLELISM_KEY = "parallelism"; + public static final String PRIMARY_RANGE_KEY = "primaryRange"; + public static final String INCREMENTAL_KEY = "incremental"; + public static final String JOB_THREADS_KEY = "jobThreads"; + public static final String RANGES_KEY = "ranges"; + public static final String COLUMNFAMILIES_KEY = "columnFamilies"; + public static final String DATACENTERS_KEY = "dataCenters"; + public static final String HOSTS_KEY = "hosts"; + public static final String TRACE_KEY = "trace"; + + // we don't want to push nodes too much for repair + public static final int MAX_JOB_THREADS = 4; + + private static final Logger logger = LoggerFactory.getLogger(RepairOption.class); + + /** + * Construct RepairOptions object from given map of Strings. + * <p> + * Available options are: + * + * <table> + * <caption>Repair Options</caption> + * <thead> + * <tr> + * <th>key</th> + * <th>value</th> + * <th>default (when key not given)</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>parallelism</td> + * <td>"sequential", "parallel" or "dc_parallel"</td> + * <td>"sequential"</td> + * </tr> + * <tr> + * <td>primaryRange</td> + * <td>"true" if perform repair only on primary range.</td> + * <td>false</td> + * </tr> + * <tr> + * <td>incremental</td> + * <td>"true" if perform incremental repair.</td> + * <td>false</td> + * </tr> + * <tr> + * <td>trace</td> + * <td>"true" if repair is traced.</td> + * <td>false</td> + * </tr> + * <tr> + * <td>jobThreads</td> + * <td>Number of threads to use to run repair job.</td> + * <td>1</td> + * </tr> + * <tr> + * <td>ranges</td> + * <td>Ranges to repair. A range is expressed as <start token>:<end token> + * and multiple ranges can be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd).</td> + * <td></td> + * </tr> + * <tr> + * <td>columnFamilies</td> + * <td>Specify names of ColumnFamilies to repair. + * Multiple ColumnFamilies can be given as comma separated values(e.g. cf1,cf2,cf3).</td> + * <td></td> + * </tr> + * <tr> + * <td>dataCenters</td> + * <td>Specify names of data centers who participate in this repair. + * Multiple data centers can be given as comma separated values(e.g. dc1,dc2,dc3).</td> + * <td></td> + * </tr> + * <tr> + * <td>hosts</td> + * <td>Specify names of hosts who participate in this repair. + * Multiple hosts can be given as comma separated values(e.g. cass1,cass2).</td> + * <td></td> + * </tr> + * </tbody> + * </table> + * + * @param options options to parse + * @param partitioner partitioner is used to construct token ranges + * @return RepairOptions object + */ + public static RepairOption parse(Map<String, String> options, IPartitioner partitioner) + { + // if no parallel option is given, then this will be "sequential" by default. + RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY)); + boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY)); + boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); + boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY)); + + int jobThreads = 1; + if (options.containsKey(JOB_THREADS_KEY)) + { + try + { + jobThreads = Integer.parseInt(options.get(JOB_THREADS_KEY)); + } + catch (NumberFormatException ignore) {} + } + // ranges + String rangesStr = options.get(RANGES_KEY); + Set<Range<Token>> ranges = new HashSet<>(); + if (rangesStr != null) + { ++ if (incremental) ++ throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair " + ++ "because each subrange repair would generate an anti-compacted table"); + StringTokenizer tokenizer = new StringTokenizer(rangesStr, ","); + while (tokenizer.hasMoreTokens()) + { + String[] rangeStr = tokenizer.nextToken().split(":", 2); + if (rangeStr.length < 2) + { + continue; + } + Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim()); + Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim()); + ranges.add(new Range<>(parsedBeginToken, parsedEndToken)); + } + } + + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges); + + // data centers + String dataCentersStr = options.get(DATACENTERS_KEY); + Collection<String> dataCenters = new HashSet<>(); + if (dataCentersStr != null) + { + StringTokenizer tokenizer = new StringTokenizer(dataCentersStr, ","); + while (tokenizer.hasMoreTokens()) + { + dataCenters.add(tokenizer.nextToken().trim()); + } + option.getDataCenters().addAll(dataCenters); + } + + // hosts + String hostsStr = options.get(HOSTS_KEY); + Collection<String> hosts = new HashSet<>(); + if (hostsStr != null) + { + StringTokenizer tokenizer = new StringTokenizer(hostsStr, ","); + while (tokenizer.hasMoreTokens()) + { + hosts.add(tokenizer.nextToken().trim()); + } + option.getHosts().addAll(hosts); + } + + // columnfamilies + String cfStr = options.get(COLUMNFAMILIES_KEY); + if (cfStr != null) + { + Collection<String> columnFamilies = new HashSet<>(); + StringTokenizer tokenizer = new StringTokenizer(cfStr, ","); + while (tokenizer.hasMoreTokens()) + { + columnFamilies.add(tokenizer.nextToken().trim()); + } + option.getColumnFamilies().addAll(columnFamilies); + } + + // validate options + if (jobThreads > MAX_JOB_THREADS) + { + throw new IllegalArgumentException("Too many job threads. Max is " + MAX_JOB_THREADS); + } + if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty())) + { + throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + } + + return option; + } + + private final RepairParallelism parallelism; + private final boolean primaryRange; + private final boolean incremental; + private final boolean trace; + private final int jobThreads; + + private final Collection<String> columnFamilies = new HashSet<>(); + private final Collection<String> dataCenters = new HashSet<>(); + private final Collection<String> hosts = new HashSet<>(); + private final Collection<Range<Token>> ranges = new HashSet<>(); + + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges) + { + if (FBUtilities.isWindows() && + (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) && + parallelism == RepairParallelism.SEQUENTIAL) + { + logger.warn("Sequential repair disabled when memory-mapped I/O is configured on Windows. Reverting to parallel."); + this.parallelism = RepairParallelism.PARALLEL; + } + else + this.parallelism = parallelism; + + this.primaryRange = primaryRange; + this.incremental = incremental; + this.trace = trace; + this.jobThreads = jobThreads; + this.ranges.addAll(ranges); + } + + public RepairParallelism getParallelism() + { + return parallelism; + } + + public boolean isPrimaryRange() + { + return primaryRange; + } + + public boolean isIncremental() + { + return incremental; + } + + public boolean isTraced() + { + return trace; + } + + public int getJobThreads() + { + return jobThreads; + } + + public Collection<String> getColumnFamilies() + { + return columnFamilies; + } + + public Collection<Range<Token>> getRanges() + { + return ranges; + } + + public Collection<String> getDataCenters() + { + return dataCenters; + } + + public Collection<String> getHosts() + { + return hosts; + } + + public boolean isGlobal() + { + return dataCenters.isEmpty() && hosts.isEmpty(); + } + @Override + public String toString() + { + return "repair options (" + + "parallelism: " + parallelism + + ", primary range: " + primaryRange + + ", incremental: " + incremental + + ", job threads: " + jobThreads + + ", ColumnFamilies: " + columnFamilies + + ", dataCenters: " + dataCenters + + ", hosts: " + hosts + + ", # of ranges: " + ranges.size() + + ')'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 74b3c73,03c1960..b5ce38b --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2958,47 -2819,25 +2958,51 @@@ public class StorageService extends Not { throw new IllegalArgumentException("Invalid parallelism degree specified: " + parallelismDegree); } - Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); - - logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", - repairingRange, keyspaceName, columnFamilies); - RepairParallelism parallelism = RepairParallelism.values()[parallelismDegree]; - return forceRepairAsync(keyspaceName, parallelism, dataCenters, hosts, repairingRange, fullRepair, columnFamilies); - } + if (FBUtilities.isWindows() && parallelism != RepairParallelism.PARALLEL) + { + logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); + parallelism = RepairParallelism.PARALLEL; + } + - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) - { + if (!fullRepair) - throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair because " + - "each subrange repair would generate an anti-compacted table"); ++ throw new IllegalArgumentException("Incremental repair can't be requested with subrange repair " + ++ "because each subrange repair would generate an anti-compacted table"); Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); + RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange); + options.getDataCenters().addAll(dataCenters); + if (hosts != null) + { + options.getHosts().addAll(hosts); + } + if (columnFamilies != null) + { + for (String columnFamily : columnFamilies) + { + options.getColumnFamilies().add(columnFamily); + } + } + logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", - repairingRange, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, isSequential, isLocal, repairingRange, fullRepair, columnFamilies); + repairingRange, keyspaceName, columnFamilies); + return forceRepairAsync(keyspaceName, options); + } + + public int forceRepairRangeAsync(String beginToken, + String endToken, + String keyspaceName, + boolean isSequential, + boolean isLocal, + boolean fullRepair, + String... columnFamilies) + { + Set<String> dataCenters = null; + if (isLocal) + { + dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); + } + return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, fullRepair, columnFamilies); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java index 11ae69f,0000000..3257a10 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java @@@ -1,96 -1,0 +1,105 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Test; + ++import com.google.common.collect.ImmutableMap; ++ +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.*; + +public class RepairOptionTest +{ + @Test + public void testParseOptions() + { + IPartitioner partitioner = Murmur3Partitioner.instance; + Token.TokenFactory tokenFactory = partitioner.getTokenFactory(); + + // parse with empty options + RepairOption option = RepairOption.parse(new HashMap<String, String>(), partitioner); + + if (FBUtilities.isWindows() && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard)) + assertTrue(option.getParallelism() == RepairParallelism.PARALLEL); + else + assertTrue(option.getParallelism() == RepairParallelism.SEQUENTIAL); + + assertFalse(option.isPrimaryRange()); + assertFalse(option.isIncremental()); + + // parse everything + Map<String, String> options = new HashMap<>(); + options.put(RepairOption.PARALLELISM_KEY, "parallel"); + options.put(RepairOption.PRIMARY_RANGE_KEY, "false"); - options.put(RepairOption.INCREMENTAL_KEY, "true"); ++ options.put(RepairOption.INCREMENTAL_KEY, "false"); + options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30"); + options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3"); + options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3"); + options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3"); + + option = RepairOption.parse(options, partitioner); + assertTrue(option.getParallelism() == RepairParallelism.PARALLEL); + assertFalse(option.isPrimaryRange()); - assertTrue(option.isIncremental()); ++ assertFalse(option.isIncremental()); + + Set<Range<Token>> expectedRanges = new HashSet<>(3); + expectedRanges.add(new Range<>(tokenFactory.fromString("0"), tokenFactory.fromString("10"))); + expectedRanges.add(new Range<>(tokenFactory.fromString("11"), tokenFactory.fromString("20"))); + expectedRanges.add(new Range<>(tokenFactory.fromString("21"), tokenFactory.fromString("30"))); + assertEquals(expectedRanges, option.getRanges()); + + Set<String> expectedCFs = new HashSet<>(3); + expectedCFs.add("cf1"); + expectedCFs.add("cf2"); + expectedCFs.add("cf3"); + assertEquals(expectedCFs, option.getColumnFamilies()); + + Set<String> expectedDCs = new HashSet<>(3); + expectedDCs.add("dc1"); + expectedDCs.add("dc2"); + expectedDCs.add("dc3"); + assertEquals(expectedDCs, option.getDataCenters()); + + Set<String> expectedHosts = new HashSet<>(3); + expectedHosts.add("127.0.0.1"); + expectedHosts.add("127.0.0.2"); + expectedHosts.add("127.0.0.3"); + assertEquals(expectedHosts, option.getHosts()); + } ++ ++ @Test(expected=IllegalArgumentException.class) ++ public void testIncrementalRepairWithSubrangesThrows() throws Exception ++ { ++ RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, ""), ++ Murmur3Partitioner.instance); ++ } +}