Updated Branches: refs/heads/trunk b8de48851 -> 5eb9e1c15
support custom sstable components patch by Piotr KoÅaczkowski; reviewed by jbellis for CASSANDRA-4049 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5eb9e1c1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5eb9e1c1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5eb9e1c1 Branch: refs/heads/trunk Commit: 5eb9e1c1576edb90f5b6e2ef975686e87a8c93af Parents: b8de488 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Oct 23 15:16:03 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Oct 23 15:16:03 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/io/sstable/Component.java | 59 +++++---- .../org/apache/cassandra/io/sstable/SSTable.java | 111 ++++++++++++--- .../apache/cassandra/io/sstable/SSTableWriter.java | 12 +- 3 files changed, 134 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb9e1c1/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 7a001ab..cbc12d9 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -34,7 +34,7 @@ public class Component public static final char separator = '-'; final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class); - enum Type + public enum Type { // the base data for an sstable: the remaining components can be regenerated // based on the data component @@ -54,7 +54,11 @@ public class Component // holds sha1 sum of the data file (to be checked by sha1sum) DIGEST("Digest.sha1"), // holds SSTable Index Summary and Boundaries - SUMMARY("Summary.db"); + SUMMARY("Summary.db"), + // table of contents, stores the list of all components for the sstable + TOC("TOC.txt"), + // custom component, used by e.g. custom compaction strategy + CUSTOM(null); final String repr; Type(String repr) @@ -67,34 +71,37 @@ public class Component for (Type type : TYPES) if (repr.equals(type.repr)) return type; - throw new RuntimeException("Invalid SSTable component: '" + repr + "'"); + return CUSTOM; } } // singleton components for types that don't need ids - public final static Component DATA = new Component(Type.DATA, -1); - public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX, -1); - public final static Component FILTER = new Component(Type.FILTER, -1); - public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER, -1); - public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO, -1); - public final static Component STATS = new Component(Type.STATS, -1); - public final static Component DIGEST = new Component(Type.DIGEST, -1); - public final static Component SUMMARY = new Component(Type.SUMMARY, -1); + public final static Component DATA = new Component(Type.DATA); + public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX); + public final static Component FILTER = new Component(Type.FILTER); + public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER); + public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO); + public final static Component STATS = new Component(Type.STATS); + public final static Component DIGEST = new Component(Type.DIGEST); + public final static Component SUMMARY = new Component(Type.SUMMARY); + public final static Component TOC = new Component(Type.TOC); public final Type type; - public final int id; + public final String name; public final int hashCode; public Component(Type type) { - this(type, -1); + this(type, type.repr); + assert type != Type.CUSTOM; } - public Component(Type type, int id) + public Component(Type type, String name) { + assert name != null : "Component name cannot be null"; this.type = type; - this.id = id; - this.hashCode = Objects.hashCode(type, id); + this.name = name; + this.hashCode = Objects.hashCode(type, name); } /** @@ -102,7 +109,7 @@ public class Component */ public String name() { - return type.repr; + return name; } /** @@ -120,14 +127,16 @@ public class Component Component component; switch(type) { - case DATA: component = Component.DATA; break; - case PRIMARY_INDEX: component = Component.PRIMARY_INDEX; break; - case FILTER: component = Component.FILTER; break; - case COMPACTED_MARKER: component = Component.COMPACTED_MARKER; break; - case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; - case STATS: component = Component.STATS; break; - case DIGEST: component = Component.DIGEST; break; + case DATA: component = Component.DATA; break; + case PRIMARY_INDEX: component = Component.PRIMARY_INDEX; break; + case FILTER: component = Component.FILTER; break; + case COMPACTED_MARKER: component = Component.COMPACTED_MARKER; break; + case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; + case STATS: component = Component.STATS; break; + case DIGEST: component = Component.DIGEST; break; case SUMMARY: component = Component.SUMMARY; break; + case TOC: component = Component.TOC; break; + case CUSTOM: component = new Component(Type.CUSTOM, path.right); break; default: throw new IllegalStateException(); } @@ -149,7 +158,7 @@ public class Component if (!(o instanceof Component)) return false; Component that = (Component)o; - return this.type == that.type && this.id == that.id; + return this.type == that.type && this.name.equals(that.name); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb9e1c1/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 8b4bafb..00bc2d7 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -17,20 +17,24 @@ */ package org.apache.cassandra.io.sstable; -import java.io.File; -import java.io.IOException; +import java.io.*; +import java.nio.charset.Charset; import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; - -import org.apache.cassandra.db.DecoratedKey; +import com.google.common.io.Files; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; @@ -103,7 +107,7 @@ public abstract class SSTable assert component.type != Component.Type.COMPACTED_MARKER; this.compression = dataComponents.contains(Component.COMPRESSION_INFO); - this.components = Collections.unmodifiableSet(dataComponents); + this.components = new CopyOnWriteArraySet<Component>(dataComponents); this.metadata = metadata; this.partitioner = partitioner; } @@ -183,36 +187,49 @@ public abstract class SSTable } /** - * @return A Descriptor,Component pair, or null if not a valid sstable component. + * @return A Descriptor,Component pair. If component is of unknown type, returns CUSTOM component. */ public static Pair<Descriptor,Component> tryComponentFromFilename(File dir, String name) { + return Component.fromFilename(dir, name); + } + + /** + * Discovers existing components for the descriptor. Slow: only intended for use outside the critical path. + */ + static Set<Component> componentsFor(final Descriptor desc) + { try { - return Component.fromFilename(dir, name); + try + { + return readTOC(desc); + } + catch (FileNotFoundException e) + { + Set<Component> components = discoverComponentsFor(desc); + if (!components.contains(Component.TOC)) + components.add(Component.TOC); + appendTOC(desc, components); + return components; + } } - catch (Exception e) + catch (IOException e) { - if (!"snapshots".equals(name) && !"backups".equals(name) - && !name.contains(".json")) - logger.warn("Invalid file '{}' in data directory {}.", name, dir); - return null; + throw new IOError(e); } } - /** - * Discovers existing components for the descriptor. Slow: only intended for use outside the critical path. - */ - static Set<Component> componentsFor(final Descriptor desc) + private static Set<Component> discoverComponentsFor(Descriptor desc) { - Set<Component> components = Sets.newHashSetWithExpectedSize(Component.TYPES.size()); - for (Component.Type componentType : Component.TYPES) + Set<Component.Type> knownTypes = Sets.difference(Component.TYPES, Collections.singleton(Component.Type.CUSTOM)); + Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size()); + for (Component.Type componentType : knownTypes) { Component component = new Component(componentType); if (new File(desc.filenameFor(component)).exists()) components.add(component); } - return components; } @@ -261,4 +278,60 @@ public abstract class SSTable "path='" + getFilename() + '\'' + ')'; } + + /** + * Reads the list of components from the TOC component. + * @return set of components found in the TOC + */ + protected static Set<Component> readTOC(Descriptor descriptor) throws IOException + { + File tocFile = new File(descriptor.filenameFor(Component.TOC)); + List<String> componentNames = Files.readLines(tocFile, Charset.defaultCharset()); + Set<Component> components = Sets.newHashSetWithExpectedSize(componentNames.size()); + for (String componentName : componentNames) + { + Component component = new Component(Component.Type.fromRepresentation(componentName), componentName); + if (!new File(descriptor.filenameFor(component)).exists()) + logger.error("Missing component: " + descriptor.filenameFor(component)); + else + components.add(component); + } + return components; + } + + /** + * Appends new component names to the TOC component. + */ + protected static void appendTOC(Descriptor descriptor, Collection<Component> components) + { + File tocFile = new File(descriptor.filenameFor(Component.TOC)); + PrintWriter w = null; + try + { + w = new PrintWriter(new FileWriter(tocFile, true)); + for (Component component : components) + w.println(component.name); + } + catch (IOException e) + { + throw new FSWriteError(e, tocFile); + } + finally + { + FileUtils.closeQuietly(w); + } + } + + /** + * Registers new custom components. Used by custom compaction strategies. + * Adding a component for the second time is a no-op. + * Don't remove this - this method is a part of the public API, intended for use by custom compaction strategies. + * @param newComponents collection of components to be added + */ + public synchronized void addComponents(Collection<Component> newComponents) + { + Collection<Component> componentsToAdd = Collections2.filter(newComponents, Predicates.not(Predicates.in(components))); + appendTOC(descriptor, componentsToAdd); + components.addAll(componentsToAdd); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb9e1c1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 06e6826..c17de4c 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -62,10 +62,11 @@ public class SSTableWriter extends SSTable private static Set<Component> components(CFMetaData metadata) { Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, - Component.FILTER, - Component.PRIMARY_INDEX, - Component.STATS, - Component.SUMMARY)); + Component.FILTER, + Component.PRIMARY_INDEX, + Component.STATS, + Component.SUMMARY, + Component.TOC)); if (metadata.compressionParameters().sstableCompressor != null) components.add(Component.COMPRESSION_INFO); @@ -324,6 +325,9 @@ public class SSTableWriter extends SSTable writeMetadata(descriptor, sstableMetadata); maybeWriteDigest(); + // save the table of components + SSTable.appendTOC(descriptor, components); + // remove the 'tmp' marker from all components final Descriptor newdesc = rename(descriptor, components);