Author: slebresne Date: Tue Jul 19 10:44:48 2011 New Revision: 1148267 URL: http://svn.apache.org/viewvc?rev=1148267&view=rev Log: Add simplified interfaces to write sstables (for bulk loading) patch by slebresne; reviewed by jbellis for CASSANDRA-2911
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1148267&r1=1148266&r2=1148267&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jul 19 10:44:48 2011 @@ -30,6 +30,8 @@ * avoid including inferred types in CF update (CASSANDRA-2809) * fix JMX bulkload call (CASSANDRA-2908) * fix updating KS with durable_writes=false (CASSANDRA-2907) + * add simplified facade to SSTableWriter for bulk loading use + (CASSANDRA-2911) 0.8.1 Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java?rev=1148267&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java Tue Jul 19 10:44:48 2011 @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.NodeId; +import org.apache.cassandra.utils.Pair; + +public abstract class AbstractSSTableSimpleWriter +{ + protected final File directory; + protected final CFMetaData metadata; + protected DecoratedKey currentKey; + protected ColumnFamily columnFamily; + protected SuperColumn currentSuperColumn; + protected final NodeId nodeid = NodeId.generate(); + + public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata) + { + this.metadata = metadata; + this.directory = directory; + } + + protected SSTableWriter getWriter() throws IOException + { + return new SSTableWriter( + makeFilename(directory, metadata.ksName, metadata.cfName), + 0, // We don't care about the bloom filter + metadata, + StorageService.getPartitioner(), + ReplayPosition.NONE); + } + + // find available generation and pick up filename from that + private static String makeFilename(File directory, final String keyspace, final String columnFamily) + { + final Set<Descriptor> existing = new HashSet<Descriptor>(); + directory.list(new FilenameFilter() + { + public boolean accept(File dir, String name) + { + Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name); + Descriptor desc = p == null ? null : p.left; + if (desc == null) + return false; + + if (desc.cfname.equals(columnFamily)) + existing.add(desc); + + return false; + } + }); + int maxGen = 0; + for (Descriptor desc : existing) + maxGen = Math.max(maxGen, desc.generation); + return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, false).filenameFor(Component.DATA); + } + + /** + * Start a new row whose key is {@code key}. + * @param key the row key + */ + public void newRow(ByteBuffer key) throws IOException + { + if (currentKey != null && !columnFamily.isEmpty()) + writeRow(currentKey, columnFamily); + + currentKey = StorageService.getPartitioner().decorateKey(key); + columnFamily = ColumnFamily.create(metadata); + } + + /** + * Start a new super column with name {@code name}. + * @param name the name for the super column + */ + public void newSuperColumn(ByteBuffer name) + { + if (!columnFamily.isSuper()) + throw new IllegalStateException("Cannot add a super column to a standard column family"); + + currentSuperColumn = new SuperColumn(name, metadata.subcolumnComparator); + columnFamily.addColumn(currentSuperColumn); + } + + private void addColumn(IColumn column) + { + if (columnFamily.isSuper() && currentSuperColumn == null) + throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started."); + + IColumnContainer container = columnFamily.isSuper() ? currentSuperColumn : columnFamily; + container.addColumn(column); + } + + /** + * Insert a new "regular" column to the current row (and super column if applicable). + * @param name the column name + * @param value the column value + * @param timestamp the column timestamp + */ + public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) + { + addColumn(new Column(name, value, timestamp)); + } + + /** + * Insert a new expiring column to the current row (and super column if applicable). + * @param name the column name + * @param value the column value + * @param timestamp the column timestamp + * @param ttl the column time to live in seconds + * @param expirationTimestamp the local expiration timestamp in milliseconds. This is the server time timestamp used for actually + * expiring the column, and as a consequence should be synchronized with the cassandra servers time. If {@code timestamp} represents + * the insertion time in microseconds (which is not required), this should be {@code (timestamp / 1000) + (ttl * 1000)}. + */ + public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS) + { + addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000))); + } + + /** + * Insert a new counter column to the current row (and super column if applicable). + * @param name the column name + * @param value the value of the counter + */ + public void addCounterColumn(ByteBuffer name, long value) + { + addColumn(new CounterColumn(name, CounterContext.instance().create(nodeid, 1L, value, false), System.currentTimeMillis())); + } + + /** + * Close this writer. + * This method should be called, otherwise the produced sstables are not + * guaranteed to be complete (and won't be in practice). + */ + public abstract void close() throws IOException; + + protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException; +} Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java?rev=1148267&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java Tue Jul 19 10:44:48 2011 @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.service.StorageService; + +/** + * A SSTable writer that doesn't assume rows are in sorted order. + * This writer buffers rows in memory and then write them all in sorted order. + * To avoid loading the entire data set in memory, the amount of rows buffered + * is configurable. Each time the threshold is met, one SSTable will be + * created (and the buffer be reseted). + * + * @see AbstractSSTableSimpleWriter + */ +public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter +{ + private final Map<DecoratedKey, ColumnFamily> keys = new TreeMap<DecoratedKey, ColumnFamily>(); + private final long bufferSize; + private long currentSize; + + /** + * Create a new buffering writer. + * @param directory the directory where to write the sstables + * @param keyspace the keyspace name + * @param columnFamily the column family name + * @param comparator the column family comparator + * @param subComparator the column family subComparator or null if not a Super column family. + * @param bufferSizeInMB the data size in MB before which a sstable is written and the buffer reseted. This correspond roughly to the written + * data size (i.e. the size of the create sstable). The actual size used in memory will be higher (by how much depends on the size of the + * columns you add). For 1GB of heap, a 128 bufferSizeInMB is probably a reasonable choice. If you experience OOM, this value should be lowered. + */ + public SSTableSimpleUnsortedWriter(File directory, + String keyspace, + String columnFamily, + AbstractType comparator, + AbstractType subComparator, + int bufferSizeInMB) throws IOException + { + super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator)); + this.bufferSize = bufferSizeInMB * 1024 * 1024; + } + + protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException + { + ColumnFamily previous = keys.put(key, columnFamily); + currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2; + + // Note that if the row was existing already, our size estimation will be slightly off + // since we'll be counting the key multiple times. + if (previous != null) + columnFamily.addAll(previous); + + if (currentSize > bufferSize) + sync(); + } + + public void close() throws IOException + { + sync(); + } + + private void sync() throws IOException + { + if (keys.isEmpty()) + return; + + SSTableWriter writer = getWriter(); + for (Map.Entry<DecoratedKey, ColumnFamily> entry : keys.entrySet()) + { + writer.append(entry.getKey(), entry.getValue()); + } + writer.closeAndOpenReader(); + currentSize = 0; + keys.clear(); + } +} Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java?rev=1148267&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java Tue Jul 19 10:44:48 2011 @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.AbstractType; + +/** + * A SSTable writer that assumes rows are in (partitioner) sorted order. + * Contrarily to SSTableSimpleUnsortedWriter, this writer does not buffer + * anything into memory, however it assumes that row are added in sorted order + * (an exception will be thrown otherwise), which for the RandomPartitioner + * means that rows should be added by increasing md5 of the row key. This is + * rarely possible and SSTableSimpleUnsortedWriter should most of the time be + * prefered. + * + * @see AbstractSSTableSimpleWriter + */ +public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter +{ + private final SSTableWriter writer; + + /** + * Create a new writer. + * @param directory the directory where to write the sstable + * @param keyspace the keyspace name + * @param columnFamily the column family name + * @param comparator the column family comparator + * @param subComparator the column family subComparator or null if not a Super column family. + */ + public SSTableSimpleWriter(File directory, + String keyspace, + String columnFamily, + AbstractType comparator, + AbstractType subComparator) throws IOException + { + this(directory, + new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator)); + } + + public SSTableSimpleWriter(File directory, CFMetaData metadata) throws IOException + { + super(directory, metadata); + writer = getWriter(); + } + + public void close() throws IOException + { + if (currentKey != null) + writeRow(currentKey, columnFamily); + writer.closeAndOpenReader(); + } + + protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException + { + writer.append(key, columnFamily); + } +}