Updated Branches: refs/heads/cassandra-1.2 9bb4d93e3 -> 39066b722
Add SSTableSplitter tool to split sstables offline patch by slebresne; reviewed by krummas for CASSANDRA-4766 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39066b72 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39066b72 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39066b72 Branch: refs/heads/cassandra-1.2 Commit: 39066b722607fa88e75d5bc772bd52f1ec8914a0 Parents: 9bb4d93 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Sun Nov 11 14:54:43 2012 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Aug 22 10:08:10 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 +- bin/sstablesplit | 50 ++++ debian/cassandra.install | 1 + .../cassandra/db/compaction/CompactionTask.java | 14 +- .../db/compaction/SSTableSplitter.java | 105 ++++++++ .../cassandra/tools/StandaloneSplitter.java | 256 +++++++++++++++++++ 7 files changed, 427 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e1c963c..e887c27 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -26,6 +26,7 @@ * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903) * Properly handle parsing huge map and set literals (CASSANDRA-5893) * Fix LCS L0 compaction may overlap in L1 (CASSANDRA-5907) + * New sstablesplit tool to split large sstables offline (CASSANDRA-4766) Merged from 1.1: * Correctly validate sparse composite cells in scrub (CASSANDRA-5855) http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 11281ee..491a438 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,9 +18,10 @@ using the provided 'sstableupgrade' tool. Features -------- - - A history of executed nodetool commands is now captured. + - A history of executed nodetool commands is now captured. It can be found in ~/.cassandra/nodetool.history. Other tools output files (cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well. + - A new sstablesplit utility allows to split large sstables offline. Defaults -------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/bin/sstablesplit ---------------------------------------------------------------------- diff --git a/bin/sstablesplit b/bin/sstablesplit new file mode 100755 index 0000000..933a67d --- /dev/null +++ b/bin/sstablesplit @@ -0,0 +1,50 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$CASSANDRA_INCLUDE" = "x" ]; then + for include in /usr/share/cassandra/cassandra.in.sh \ + /usr/local/share/cassandra/cassandra.in.sh \ + /opt/cassandra/cassandra.in.sh \ + ~/.cassandra.in.sh \ + `dirname $0`/cassandra.in.sh; do + if [ -r $include ]; then + . $include + break + fi + done +elif [ -r $CASSANDRA_INCLUDE ]; then + . $CASSANDRA_INCLUDE +fi + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -x $JAVA_HOME/bin/java ]; then + JAVA=$JAVA_HOME/bin/java +else + JAVA=`which java` +fi + +if [ -z $CLASSPATH ]; then + echo "You must set the CLASSPATH var" >&2 + exit 1 +fi + +$JAVA -ea -cp $CLASSPATH -Xmx256M \ + -Dlog4j.configuration=log4j-tools.properties \ + org.apache.cassandra.tools.StandaloneSplitter "$@" + +# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/debian/cassandra.install ---------------------------------------------------------------------- diff --git a/debian/cassandra.install b/debian/cassandra.install index a504b78..70d4b97 100644 --- a/debian/cassandra.install +++ b/debian/cassandra.install @@ -18,6 +18,7 @@ bin/sstableloader usr/bin bin/cqlsh usr/bin bin/sstablescrub usr/bin bin/sstableupgrade usr/bin +bin/sstablesplit usr/bin bin/cassandra-shuffle usr/bin tools/bin/cassandra-stress usr/bin tools/bin/token-generator usr/bin http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index a6b9f89..0fed0a2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -98,7 +98,7 @@ public class CompactionTask extends AbstractCompactionTask for (SSTableReader sstable : toCompact) assert sstable.descriptor.cfname.equals(cfs.columnFamily); - CompactionController controller = new CompactionController(cfs, toCompact, gcBefore); + CompactionController controller = getCompactionController(toCompact); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) @@ -227,7 +227,7 @@ public class CompactionTask extends AbstractCompactionTask collector.finishCompaction(ci); } - cfs.replaceCompactedSSTables(toCompact, sstables, compactionType); + replaceCompactedSSTables(toCompact, sstables); // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up for (SSTableReader sstable : sstables) { @@ -265,6 +265,16 @@ public class CompactionTask extends AbstractCompactionTask } } + protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements) + { + cfs.replaceCompactedSSTables(compacted, replacements, compactionType); + } + + protected CompactionController getCompactionController(Collection<SSTableReader> toCompact) + { + return new CompactionController(cfs, toCompact, gcBefore); + } + protected boolean partialCompactionsAcceptable() { return !isUserDefined; http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java new file mode 100644 index 0000000..214c7a1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.sstable.*; + +public class SSTableSplitter { + + private final SplittingCompactionTask task; + + private CompactionInfo.Holder info; + + public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB) + { + this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB); + } + + public void split() throws IOException + { + task.execute(new StatsCollector()); + } + + public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector + { + public void beginCompaction(CompactionInfo.Holder ci) + { + SSTableSplitter.this.info = ci; + } + + public void finishCompaction(CompactionInfo.Holder ci) + { + // no-op + } + } + + public static class SplittingCompactionTask extends CompactionTask + { + private final int sstableSizeInMB; + + public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB) + { + super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC); + this.sstableSizeInMB = sstableSizeInMB; + + if (sstableSizeInMB <= 0) + throw new IllegalArgumentException("Invalid target size for SSTables, must be > 0 (got: " + sstableSizeInMB + ")"); + } + + @Override + protected CompactionController getCompactionController(Collection<SSTableReader> toCompact) + { + return new SplitController(cfs, toCompact); + } + + @Override + protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements) + { + } + + @Override + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) + { + return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L; + } + + @Override + protected boolean partialCompactionsAcceptable() + { + return true; + } + } + + public static class SplitController extends CompactionController + { + public SplitController(ColumnFamilyStore cfs, Collection<SSTableReader> toCompact) + { + super(cfs, CompactionManager.NO_GC); + } + + @Override + public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp) + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java new file mode 100644 index 0000000..1ce94be --- /dev/null +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.tools; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +import org.apache.commons.cli.*; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.compaction.LeveledManifest; +import org.apache.cassandra.db.compaction.SSTableSplitter; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions; + +public class StandaloneSplitter +{ + public static final int DEFAULT_SSTABLE_SIZE = 50; + + static + { + CassandraDaemon.initLog4j(); + } + + private static final String TOOL_NAME = "sstablessplit"; + private static final String VERBOSE_OPTION = "verbose"; + private static final String DEBUG_OPTION = "debug"; + private static final String HELP_OPTION = "help"; + private static final String NO_SNAPSHOT_OPTION = "no-snapshot"; + private static final String SIZE_OPTION = "size"; + + public static void main(String args[]) throws IOException + { + Options options = Options.parseArgs(args); + try + { + // load keyspace descriptions. + DatabaseDescriptor.loadSchemas(); + + String ksName = null; + String cfName = null; + Map<Descriptor, Set<Component>> parsedFilenames = new HashMap<Descriptor, Set<Component>>(); + for (String filename : options.filenames) + { + File file = new File(filename); + if (!file.exists()) { + System.out.println("Skipping inexisting file " + file); + continue; + } + + Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName()); + if (pair == null) { + System.out.println("Skipping non sstable file " + file); + continue; + } + Descriptor desc = pair.left; + + if (ksName == null) + ksName = desc.ksname; + else if (!ksName.equals(desc.ksname)) + throw new IllegalArgumentException("All sstables must be part of the same keyspace"); + + if (cfName == null) + cfName = desc.cfname; + else if (!cfName.equals(desc.cfname)) + throw new IllegalArgumentException("All sstables must be part of the same column family"); + + Set<Component> components = new HashSet<Component>(Arrays.asList(new Component[]{ + Component.DATA, + Component.PRIMARY_INDEX, + Component.FILTER, + Component.COMPRESSION_INFO, + Component.STATS + })); + + Iterator<Component> iter = components.iterator(); + while (iter.hasNext()) { + Component component = iter.next(); + if (!(new File(desc.filenameFor(component)).exists())) + iter.remove(); + } + parsedFilenames.put(desc, components); + } + + if (ksName == null || cfName == null) + { + System.err.println("No valid sstables to split"); + System.exit(1); + } + + // Do not load sstables since they might be broken + Table table = Table.openWithoutSSTables(ksName); + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + String snapshotName = "pre-split-" + System.currentTimeMillis(); + + List<SSTableReader> sstables = new ArrayList<SSTableReader>(); + for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet()) + { + try + { + SSTableReader sstable = SSTableReader.openNoValidation(fn.getKey(), fn.getValue(), cfs.metadata); + sstables.add(sstable); + + if (options.snapshot) { + File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName); + sstable.createLinks(snapshotDirectory.getPath()); + } + + } + catch (Exception e) + { + System.err.println(String.format("Error Loading %s: %s", fn.getKey(), e.getMessage())); + if (options.debug) + e.printStackTrace(System.err); + } + } + if (options.snapshot) + System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName)); + + cfs.getDataTracker().markCompacting(sstables); + for (SSTableReader sstable : sstables) + { + try + { + new SSTableSplitter(cfs, sstable, options.sizeInMB).split(); + + // Remove the sstable + sstable.markCompacted(); + sstable.releaseReference(); + } + catch (Exception e) + { + System.err.println(String.format("Error splitting %s: %s", sstable, e.getMessage())); + if (options.debug) + e.printStackTrace(System.err); + } + } + SSTableDeletingTask.waitForDeletions(); + System.exit(0); // We need that to stop non daemonized threads + } + catch (Exception e) + { + System.err.println(e.getMessage()); + if (options.debug) + e.printStackTrace(System.err); + System.exit(1); + } + } + + private static class Options + { + public final List<String> filenames; + + public boolean debug; + public boolean verbose; + public boolean snapshot; + public int sizeInMB; + + private Options(List<String> filenames) + { + this.filenames = filenames; + } + + public static Options parseArgs(String cmdArgs[]) + { + CommandLineParser parser = new GnuParser(); + CmdLineOptions options = getCmdLineOptions(); + try + { + CommandLine cmd = parser.parse(options, cmdArgs, false); + + if (cmd.hasOption(HELP_OPTION)) + { + printUsage(options); + System.exit(0); + } + + String[] args = cmd.getArgs(); + if (args.length == 0) + { + System.err.println("No sstables to split"); + printUsage(options); + System.exit(1); + } + Options opts = new Options(Arrays.asList(args)); + opts.debug = cmd.hasOption(DEBUG_OPTION); + opts.verbose = cmd.hasOption(VERBOSE_OPTION); + opts.snapshot = !cmd.hasOption(NO_SNAPSHOT_OPTION); + + if (cmd.hasOption(SIZE_OPTION)) + opts.sizeInMB = Integer.valueOf(cmd.getOptionValue(SIZE_OPTION)); + + return opts; + } + catch (ParseException e) + { + errorMsg(e.getMessage(), options); + return null; + } + } + + private static void errorMsg(String msg, CmdLineOptions options) + { + System.err.println(msg); + printUsage(options); + System.exit(1); + } + + private static CmdLineOptions getCmdLineOptions() + { + CmdLineOptions options = new CmdLineOptions(); + options.addOption(null, DEBUG_OPTION, "display stack traces"); + options.addOption("v", VERBOSE_OPTION, "verbose output"); + options.addOption("h", HELP_OPTION, "display this help message"); + options.addOption(null, NO_SNAPSHOT_OPTION, "don't snapshot the sstables before splitting"); + options.addOption("s", SIZE_OPTION, "size", "maximum size in MB for the output sstables (default: " + DEFAULT_SSTABLE_SIZE + ")"); + return options; + } + + public static void printUsage(CmdLineOptions options) + { + String usage = String.format("%s [options] <filename> [<filename>]*", TOOL_NAME); + StringBuilder header = new StringBuilder(); + header.append("--\n"); + header.append("Split the provided sstables files in sstables of maximum provided file size (see option --" + SIZE_OPTION + ")." ); + header.append("\n--\n"); + header.append("Options are:"); + new HelpFormatter().printHelp(usage, header.toString(), options, ""); + } + } +}