Author: jbellis Date: Tue Jul 26 04:30:43 2011 New Revision: 1150984 URL: http://svn.apache.org/viewvc?rev=1150984&view=rev Log: Remove SSTableWriter.Builder patch by jbellis and stuhood; reviewed by Yuki Morishita for CASSANDRA-2920
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jul 26 04:30:43 2011 @@ -857,39 +857,6 @@ public class CompactionManager implement return executor.submit(runnable); } - /** - * Submits an sstable to be rebuilt: is not scheduled, since the sstable must not exist. - */ - public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type) - { - // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession. - final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type); - Callable<SSTableReader> callable = new Callable<SSTableReader>() - { - public SSTableReader call() throws IOException - { - compactionLock.readLock().lock(); - try - { - executor.beginCompaction(builder); - try - { - return builder.build(); - } - finally - { - executor.finishCompaction(builder); - } - } - finally - { - compactionLock.readLock().unlock(); - } - } - }; - return executor.submit(callable); - } - public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer) { Runnable runnable = new WrappedRunnable() Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jul 26 04:30:43 2011 @@ -170,6 +170,11 @@ public class SSTableWriter extends SSTab afterAppend(decoratedKey, currentPosition); } + public void updateMaxTimestamp(long timestamp) + { + sstableMetadataCollector.updateMaxTimestamp(timestamp); + } + /** * Attempt to close the index writer and data file before deleting all temp components for the sstable */ @@ -258,351 +263,6 @@ public class SSTableWriter extends SSTab { return dataFile.getFilePointer(); } - - public static Builder createBuilder(Descriptor desc, OperationType type) - { - if (!desc.isLatestVersion) - // TODO: streaming between different versions will fail: need support for - // recovering other versions to provide a stable streaming api - throw new RuntimeException(String.format("Cannot recover SSTable %s due to version mismatch. (current version is %s).", desc.toString() - , Descriptor.CURRENT_VERSION)); - - return new Builder(desc, type); - } - - /** - * Removes the given SSTable from temporary status and opens it, rebuilding the - * bloom filter and row index from the data file. - * - * TODO remove this post-1.0, we have one-pass streaming now (see IncomingStreamReader) - */ - public static class Builder implements CompactionInfo.Holder - { - private final Descriptor desc; - private final OperationType type; - private final ColumnFamilyStore cfs; - private RowIndexer indexer; - - public Builder(Descriptor desc, OperationType type) - { - this.desc = desc; - this.type = type; - cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname); - } - - public CompactionInfo getCompactionInfo() - { - maybeOpenIndexer(); - try - { - // both file offsets are still valid post-close - return new CompactionInfo(desc.ksname, - desc.cfname, - CompactionType.SSTABLE_BUILD, - indexer.dfile.getFilePointer(), - indexer.dfile.length()); - } - catch (IOException e) - { - throw new IOError(e); - } - } - - // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager, - // since the 8MB buffers can use up heap quickly - private void maybeOpenIndexer() - { - if (indexer != null) - return; - try - { - if (cfs.metadata.getDefaultValidator().isCommutative()) - indexer = new CommutativeRowIndexer(desc, cfs, type); - else - indexer = new RowIndexer(desc, cfs, type); - } - catch (IOException e) - { - throw new IOError(e); - } - } - - public SSTableReader build() throws IOException - { - try - { - if (cfs.isInvalid()) - return null; - maybeOpenIndexer(); - - File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX)); - File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER)); - assert !ifile.exists(); - assert !ffile.exists(); - - long estimatedRows = indexer.prepareIndexing(); - - // build the index and filter - long rows = indexer.index(); - - logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows); - return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, Descriptor.TempState.ANY))); - } - finally - { - cleanupIfNecessary(); - } - } - - /** - * Attempt to close the index writer before deleting all temp components for the sstable - */ - public void cleanupIfNecessary() - { - FileUtils.closeQuietly(indexer); - - try - { - Set<Component> components = SSTable.componentsFor(desc, Descriptor.TempState.TEMP); - if (!components.isEmpty()) - SSTable.delete(desc, components); - } - catch (Exception e) - { - logger.error(String.format("Failed deleting temp components for %s", desc), e); - } - } - - } - - static class RowIndexer implements Closeable - { - protected final Descriptor desc; - public final RandomAccessReader dfile; - private final OperationType type; - - protected IndexWriter iwriter; - protected ColumnFamilyStore cfs; - protected final SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(); - - RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException - { - this(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type); - } - - protected RowIndexer(Descriptor desc, RandomAccessReader dfile, ColumnFamilyStore cfs, OperationType type) throws IOException - { - this.desc = desc; - this.dfile = dfile; - this.type = type; - this.cfs = cfs; - } - - long prepareIndexing() throws IOException - { - long estimatedRows; - try - { - estimatedRows = SSTable.estimateRowsFromData(desc, dfile); - iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows); - return estimatedRows; - } - catch(IOException e) - { - dfile.close(); - throw e; - } - } - - long index() throws IOException - { - try - { - return doIndexing(); - } - finally - { - try - { - close(); - } - catch (IOException e) - { - throw new IOError(e); - } - } - } - - public void close() throws IOException - { - dfile.close(); - iwriter.close(); - } - - /* - * If the key is cached, we should: - * - For AES: run the newly received row by the cache - * - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and - * then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key - * would be obsolete and so we must invalidate the cache). - */ - protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException - { - ColumnFamily cached = cfs.getRawCachedRow(key); - if (cached != null) - { - switch (type) - { - case AES: - if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) - { - // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable - // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning. - logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled."); - cfs.invalidateCachedRow(key); - } - else - { - ColumnFamily cf; - if (row == null) - { - // If not provided, read from disk. - long position = dfile.getFilePointer(); - cf = ColumnFamily.create(cfs.metadata); - ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true); - dfile.seek(position); - } - else - { - assert row instanceof PrecompactedRow; - // we do not purge so we should not get a null here - cf = ((PrecompactedRow)row).getFullColumnFamily(); - } - cfs.updateRowCache(key, cf); - } - break; - default: - cfs.invalidateCachedRow(key); - break; - } - } - } - - protected long doIndexing() throws IOException - { - long rows = 0; - DecoratedKey key; - long rowPosition = 0; - ColumnFamily cf = ColumnFamily.create(cfs.metadata); - while (rowPosition < dfile.length()) - { - // read key - key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile)); - iwriter.afterAppend(key, rowPosition); - - // seek to next key - long dataSize = SSTableReader.readRowSize(dfile, desc); - rowPosition = dfile.getFilePointer() + dataSize; - - IndexHelper.skipBloomFilter(dfile); - IndexHelper.skipIndex(dfile); - ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile); - - // We can't simply get the max column timestamp here by calling cf.maxTimestamp() because - // the columns have not been deserialized yet. observeColumnsInSSTable() will deserialize - // and get the max timestamp instead. - ColumnFamily.serializer().observeColumnsInSSTable(cfs.metadata, dfile, sstableMetadataCollector); - - // don't move that statement around, it expects the dfile to be before the columns - updateCache(key, dataSize, null); - - sstableMetadataCollector.addRowSize(dataSize); - - dfile.seek(rowPosition); - - rows++; - } - writeMetadata(desc, sstableMetadataCollector.finalizeMetadata()); - return rows; - } - - public String toString() - { - return "RowIndexer(" + desc + ")"; - } - } - - /* - * When a sstable for a counter column family is streamed, we must ensure - * that on the receiving node all counter column goes through the - * deserialization from remote code path (i.e, it must be cleared from its - * delta) to maintain the invariant that on a given node, only increments - * that the node originated are delta (and copy of those must not be delta). - * - * Since after streaming row indexation goes through every streamed - * sstable, we use this opportunity to ensure this property. This is the - * goal of this specific CommutativeRowIndexer. - */ - static class CommutativeRowIndexer extends RowIndexer - { - protected SequentialWriter writerDfile; - - CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException - { - super(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type); - writerDfile = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true); - } - - @Override - protected long doIndexing() throws IOException - { - long rows = 0L; - DecoratedKey key; - - CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true); - while (!dfile.isEOF()) - { - // read key - key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile)); - - // skip data size, bloom filter, column index - long dataSize = SSTableReader.readRowSize(dfile, desc); - SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true); - - AbstractCompactedRow row = controller.getCompactedRow(iter); - updateCache(key, dataSize, row); - - sstableMetadataCollector.addRowSize(dataSize); - sstableMetadataCollector.addColumnCount(row.columnCount()); - sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp()); - - // update index writer - iwriter.afterAppend(key, writerDfile.getFilePointer()); - // write key and row - ByteBufferUtil.writeWithShortLength(key.key, writerDfile.stream); - row.write(writerDfile.stream); - - rows++; - } - writeMetadata(desc, sstableMetadataCollector.finalizeMetadata()); - - if (writerDfile.getFilePointer() != dfile.getFilePointer()) - { - // truncate file to new, reduced length - writerDfile.truncate(writerDfile.getFilePointer()); - } - writerDfile.sync(); - - return rows; - } - - @Override - public void close() throws IOException - { - super.close(); - writerDfile.close(); - } - } /** * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jul 26 04:30:43 2011 @@ -28,9 +28,6 @@ import org.apache.cassandra.gms.Gossiper import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.security.streaming.SSLIncomingStreamReader; import org.apache.cassandra.streaming.IncomingStreamReader; import org.apache.cassandra.streaming.StreamHeader; @@ -171,9 +168,6 @@ public class IncomingTcpConnection exten private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException { - if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all) - new SSLIncomingStreamReader(streamHeader, socket, input).read(); - else - new IncomingStreamReader(streamHeader, socket).read(); + new IncomingStreamReader(streamHeader, socket).read(); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Tue Jul 26 04:30:43 2011 @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.security.streaming; - -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.io.IOException; -import java.io.DataInputStream; - -import org.apache.cassandra.streaming.FileStreamTask; -import org.apache.cassandra.streaming.IncomingStreamReader; -import org.apache.cassandra.streaming.StreamHeader; - -/** - * This class uses a DataInputStream to read data as opposed to a FileChannel.transferFrom - * used by IncomingStreamReader because the underlying SSLServerSocket doesn't support - * encrypting over NIO SocketChannel. - */ -public class SSLIncomingStreamReader extends IncomingStreamReader -{ - private final DataInputStream input; - - public SSLIncomingStreamReader(StreamHeader header, Socket socket, DataInputStream input) throws IOException - { - super(header, socket); - this.input = input; - } - - @Override - protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException - { - int toRead = (int)Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead); - ByteBuffer buf = ByteBuffer.allocate(toRead); - input.readFully(buf.array()); - fc.write(buf); - bytesRead += buf.limit(); - remoteFile.progress += buf.limit(); - return bytesRead; - } -} Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Tue Jul 26 04:30:43 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.streaming; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.util.Collections; @@ -52,12 +51,12 @@ public class IncomingStreamReader protected final PendingFile localFile; protected final PendingFile remoteFile; - private final SocketChannel socketChannel; protected final StreamInSession session; + private final Socket socket; public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException { - this.socketChannel = socket.getChannel(); + this.socket = socket; InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress(); session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId); session.addFiles(header.pendingFiles); @@ -72,26 +71,19 @@ public class IncomingStreamReader public void read() throws IOException { if (remoteFile != null) - readFile(); - - session.closeIfFinished(); - } - - protected void readFile() throws IOException - { - if (logger.isDebugEnabled()) { - logger.debug("Receiving stream"); - logger.debug("Creating file for {} with {} estimated keys", - localFile.getFilename(), - remoteFile.estimatedKeys); - } + if (logger.isDebugEnabled()) + { + logger.debug("Receiving stream"); + logger.debug("Creating file for {} with {} estimated keys", + localFile.getFilename(), + remoteFile.estimatedKeys); + } - SSTableReader reader = null; - if (remoteFile.estimatedKeys > 0) - { + assert remoteFile.estimatedKeys > 0; + SSTableReader reader = null; logger.debug("Estimated keys {}", remoteFile.estimatedKeys); - DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream()); + DataInputStream dis = new DataInputStream(socket.getInputStream()); try { reader = streamIn(dis, localFile, remoteFile); @@ -105,53 +97,11 @@ public class IncomingStreamReader { dis.close(); } - } - else - { - // backwards compatibility path - FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true); - FileChannel fc = fos.getChannel(); - long offset = 0; - try - { - for (Pair<Long, Long> section : localFile.sections) - { - long length = section.right - section.left; - long bytesRead = 0; - while (bytesRead < length) - { - bytesRead = readnwrite(length, bytesRead, offset, fc); - } - offset += length; - } - } - catch (IOException ex) - { - retry(); - throw ex; - } - finally - { - fc.close(); - } + session.finished(remoteFile, reader); } - session.finished(remoteFile, localFile, reader); - } - - protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException - { - long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead); - long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead); - // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read - // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and - // raise an exception (that will be catch belove and 'the right thing' will be done). - if (lastRead == 0) - throw new IOException("Transfer failed for remote file " + remoteFile); - bytesRead += lastRead; - remoteFile.progress += lastRead; - return bytesRead; + session.closeIfFinished(); } private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException @@ -183,6 +133,8 @@ public class IncomingStreamReader SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true); AbstractCompactedRow row = controller.getCompactedRow(iter); writer.append(row); + // row append does not update the max timestamp on its own + writer.updateMaxTimestamp(row.maxTimestamp()); if (row instanceof PrecompactedRow) { Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Jul 26 04:30:43 2011 @@ -50,7 +50,6 @@ public class StreamInSession private final Pair<InetAddress, Long> context; private final Runnable callback; private String table; - private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>(); private final List<SSTableReader> readers = new ArrayList<SSTableReader>(); private PendingFile current; @@ -103,22 +102,13 @@ public class StreamInSession } } - public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException + public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException { if (logger.isDebugEnabled()) logger.debug("Finished {}. Sending ack to {}", remoteFile, this); - if (reader != null) - { - // SSTR was already built during streaming - readers.add(reader); - } - else - { - Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type); - buildFutures.add(future); - } - + assert reader != null; + readers.add(reader); files.remove(remoteFile); if (remoteFile.equals(current)) current = null; @@ -143,33 +133,6 @@ public class StreamInSession List<SSTableReader> referenced = new LinkedList<SSTableReader>(); try { - for (Future<SSTableReader> future : buildFutures) - { - try - { - SSTableReader sstable = future.get(); - assert sstable.getTableName().equals(table); - - // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races - sstable.acquireReference(); - referenced.add(sstable); - - ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName()); - cfs.addSSTable(sstable); - if (!cfstores.containsKey(cfs)) - cfstores.put(cfs, new ArrayList<SSTableReader>()); - cfstores.get(cfs).add(sstable); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } - } - for (SSTableReader sstable : readers) { assert sstable.getTableName().equals(table); Modified: cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java (original) +++ cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java Tue Jul 26 04:30:43 2011 @@ -73,33 +73,6 @@ public class LongCompactionSpeedTest ext testCompaction(100, 800, 5); } - /** - * Test aes counter repair with a very wide row. - */ - @Test - public void testAESCountersRepairWide() throws Exception - { - testAESCountersRepair(2, 1, 500000); - } - - /** - * Test aes counter repair with lots of skinny rows. - */ - @Test - public void testAESCountersRepairSlim() throws Exception - { - testAESCountersRepair(2, 500000, 1); - } - - /** - * Test aes counter repair with lots of small sstables. - */ - @Test - public void testAESCounterRepairMany() throws Exception - { - testAESCountersRepair(100, 1000, 5); - } - protected void testCompaction(int sstableCount, int rowsPerSSTable, int colsPerRow) throws Exception { CompactionManager.instance.disableAutoCompaction(); @@ -140,64 +113,4 @@ public class LongCompactionSpeedTest ext colsPerRow, System.currentTimeMillis() - start)); } - - protected void testAESCountersRepair(int sstableCount, final int rowsPerSSTable, final int colsPerRow) throws Exception - { - final String cfName = "Counter1"; - CompactionManager.instance.disableAutoCompaction(); - - ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>(); - for (int k = 0; k < sstableCount; k++) - { - final int sstableNum = k; - SSTableReader sstable = SSTableUtils.prepare().ks(TABLE1).cf(cfName).write(rowsPerSSTable, new SSTableUtils.Appender(){ - int written = 0; - public boolean append(SSTableWriter writer) throws IOException - { - if (written > rowsPerSSTable) - return false; - - DecoratedKey key = Util.dk(String.format("%020d", written)); - ColumnFamily cf = ColumnFamily.create(TABLE1, cfName); - for (int i = 0; i < colsPerRow; i++) - cf.addColumn(createCounterColumn(String.valueOf(i))); - writer.append(key, cf); - written++; - return true; - } - }); - - // whack the index to trigger the recover - FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); - FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.FILTER)); - - sstables.add(sstable); - } - - // give garbage collection a bit of time to catch up - Thread.sleep(1000); - - long start = System.currentTimeMillis(); - - for (SSTableReader sstable : sstables) - CompactionManager.instance.submitSSTableBuild(sstable.descriptor, OperationType.AES).get(); - - System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms", - this.getClass().getName(), - sstableCount, - rowsPerSSTable, - colsPerRow, - System.currentTimeMillis() - start)); - } - - protected CounterColumn createCounterColumn(String name) - { - ContextState context = ContextState.allocate(4, 1); - context.writeElement(NodeId.fromInt(1), 4L, 2L, true); - context.writeElement(NodeId.fromInt(2), 4L, 2L); - context.writeElement(NodeId.fromInt(4), 3L, 3L); - context.writeElement(NodeId.fromInt(8), 2L, 4L); - - return new CounterColumn(ByteBufferUtil.bytes(name), context.context, 0L); - } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Tue Jul 26 04:30:43 2011 @@ -31,10 +31,12 @@ import org.apache.cassandra.db.Column; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.Util; +import static org.junit.Assert.assertEquals; public class SSTableUtils { @@ -74,6 +76,48 @@ public class SSTableUtils return datafile; } + public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException + { + SSTableScanner slhs = lhs.getDirectScanner(2048); + SSTableScanner srhs = rhs.getDirectScanner(2048); + while (slhs.hasNext()) + { + IColumnIterator ilhs = slhs.next(); + assert srhs.hasNext() : "LHS contained more rows than RHS"; + IColumnIterator irhs = srhs.next(); + assertContentEquals(ilhs, irhs); + } + assert !srhs.hasNext() : "RHS contained more rows than LHS"; + } + + public static void assertContentEquals(IColumnIterator lhs, IColumnIterator rhs) throws IOException + { + assertEquals(lhs.getKey(), rhs.getKey()); + // check metadata + ColumnFamily lcf = lhs.getColumnFamily(); + ColumnFamily rcf = rhs.getColumnFamily(); + if (lcf == null) + { + if (rcf == null) + return; + throw new AssertionError("LHS had no content for " + rhs.getKey()); + } + else if (rcf == null) + throw new AssertionError("RHS had no content for " + lhs.getKey()); + assertEquals(lcf.getMarkedForDeleteAt(), rcf.getMarkedForDeleteAt()); + assertEquals(lcf.getLocalDeletionTime(), rcf.getLocalDeletionTime()); + // iterate columns + while (lhs.hasNext()) + { + IColumn clhs = lhs.next(); + assert rhs.hasNext() : "LHS contained more columns than RHS for " + lhs.getKey(); + IColumn crhs = rhs.next(); + + assertEquals("Mismatched columns for " + lhs.getKey(), clhs, crhs); + } + assert !rhs.hasNext() : "RHS contained more columns than LHS for " + lhs.getKey(); + } + /** * @return A Context with chainable methods to configure and write a SSTable. */ @@ -190,6 +234,7 @@ public class SSTableUtils long start = System.currentTimeMillis(); while (appender.append(writer)) { /* pass */ } SSTableReader reader = writer.closeAndOpenReader(); + reader.acquireReference(); // mark all components for removal if (cleanup) for (Component component : reader.components) Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1150984&r1=1150983&r2=1150984&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue Jul 26 04:30:43 2011 @@ -20,7 +20,9 @@ package org.apache.cassandra.streaming; */ import static junit.framework.Assert.assertEquals; +import org.apache.cassandra.Util; import static org.apache.cassandra.Util.column; +import static org.apache.cassandra.Util.addMutation; import java.net.InetAddress; import java.util.*; @@ -29,6 +31,7 @@ import org.apache.cassandra.CleanupHelpe import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; @@ -41,6 +44,7 @@ import org.apache.cassandra.thrift.Index import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NodeId; import org.junit.BeforeClass; import org.junit.Test; @@ -56,32 +60,26 @@ public class StreamingTransferTest exten StorageService.instance.initServer(); } - @Test - public void testTransferTable() throws Exception + /** + * Create and transfer a single sstable, and return the keys that should have been transferred. + * The Mutator must create the given column, but it may also create any other columns it pleases. + */ + private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception { - Table table = Table.open("Keyspace1"); - ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1"); - // write a temporary SSTable, and unregister it + long timestamp = 1234; for (int i = 1; i <= 3; i++) - { - String key = "key" + i; - String col = "col" + i; - RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key)); - ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily); - cf.addColumn(column(col, "v", 0)); - cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes((long) i), 0)); - rm.add(cf); - rm.apply(); - } + mutator.mutate("key" + i, "col" + i, timestamp); cfs.forceBlockingFlush(); - assert cfs.getSSTables().size() == 1; + Util.compactAll(cfs).get(); + assertEquals(1, cfs.getSSTables().size()); SSTableReader sstable = cfs.getSSTables().iterator().next(); // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it sstable.acquireReference(); cfs.removeAllSSTables(); // transfer the first and last key + int[] offs = new int[]{1, 3}; IPartitioner p = StorageService.getPartitioner(); List<Range> ranges = new ArrayList<Range>(); ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); @@ -90,27 +88,133 @@ public class StreamingTransferTest exten StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP); session.await(); - // confirm that the SSTable was transferred and registered - List<Row> rows = Util.getRangeSlice(cfs); - assertEquals(2, rows.size()); - assert rows.get(0).key.key.equals( ByteBufferUtil.bytes("key1")); - assert rows.get(1).key.key.equals( ByteBufferUtil.bytes("key3")); - assertEquals(2, rows.get(0).cf.getColumnsMap().size()); - assertEquals(2, rows.get(1).cf.getColumnsMap().size()); - assert rows.get(1).cf.getColumn(ByteBufferUtil.bytes("col3")) != null; + // confirm that a single SSTable was transferred and registered + assertEquals(1, cfs.getSSTables().size()); // and that the index and filter were properly recovered - assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key1"), new QueryPath(cfs.columnFamily))); - assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key3"), new QueryPath(cfs.columnFamily))); + List<Row> rows = Util.getRangeSlice(cfs); + assertEquals(offs.length, rows.size()); + for (int i = 0; i < offs.length; i++) + { + String key = "key" + offs[i]; + String col = "col" + offs[i]; + assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key), + new QueryPath(cfs.columnFamily))); + assert rows.get(i).key.key.equals(ByteBufferUtil.bytes(key)); + assert rows.get(i).cf.getColumn(ByteBufferUtil.bytes(col)) != null; + } - // and that the secondary index works - IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(3L)); - IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100); - IFilter filter = new IdentityQueryFilter(); - Range range = new Range(p.getMinimumToken(), p.getMinimumToken()); - rows = cfs.scan(clause, range, filter); - assertEquals(1, rows.size()); - assert rows.get(0).key.key.equals( ByteBufferUtil.bytes("key3")) ; + // and that the max timestamp for the file was rediscovered + assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp()); + + List<String> keys = new ArrayList<String>(); + for (int off : offs) + keys.add("key" + off); + return keys; + } + + @Test + public void testTransferTable() throws Exception + { + final Table table = Table.open("Keyspace1"); + final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1"); + + List<String> keys = createAndTransfer(table, cfs, new Mutator() + { + public void mutate(String key, String col, long timestamp) throws Exception + { + long val = key.hashCode(); + RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key)); + ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily); + cf.addColumn(column(col, "v", timestamp)); + cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp)); + rm.add(cf); + rm.apply(); + } + }); + + // confirm that the secondary index was recovered + for (String key : keys) + { + long val = key.hashCode(); + IPartitioner p = StorageService.getPartitioner(); + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), + IndexOperator.EQ, + ByteBufferUtil.bytes(val)); + IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100); + IFilter filter = new IdentityQueryFilter(); + Range range = new Range(p.getMinimumToken(), p.getMinimumToken()); + List<Row> rows = cfs.scan(clause, range, filter); + assertEquals(1, rows.size()); + assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key)); + } + } + + @Test + public void testTransferTableSuper() throws Exception + { + final Table table = Table.open("Keyspace1"); + final ColumnFamilyStore cfs = table.getColumnFamilyStore("Super1"); + + createAndTransfer(table, cfs, new Mutator() + { + public void mutate(String key, String col, long timestamp) throws Exception + { + RowMutation rm = new RowMutation(table.name, ByteBufferUtil.bytes(key)); + addMutation(rm, cfs.columnFamily, col, 1, "val1", timestamp); + rm.apply(); + } + }); + } + + @Test + public void testTransferTableCounter() throws Exception + { + final Table table = Table.open("Keyspace1"); + final ColumnFamilyStore cfs = table.getColumnFamilyStore("Counter1"); + final CounterContext cc = new CounterContext(); + + final Map<String, ColumnFamily> cleanedEntries = new HashMap<String, ColumnFamily>(); + + List<String> keys = createAndTransfer(table, cfs, new Mutator() + { + /** Creates a new SSTable per key: all will be merged before streaming. */ + public void mutate(String key, String col, long timestamp) throws Exception + { + Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>(); + ColumnFamily cf = ColumnFamily.create(cfs.metadata); + ColumnFamily cfCleaned = ColumnFamily.create(cfs.metadata); + CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 1); + state.writeElement(NodeId.fromInt(2), 9L, 3L, true); + state.writeElement(NodeId.fromInt(4), 4L, 2L); + state.writeElement(NodeId.fromInt(6), 3L, 3L); + state.writeElement(NodeId.fromInt(8), 2L, 4L); + cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), + state.context, + timestamp)); + cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), + cc.clearAllDelta(state.context), + timestamp)); + + entries.put(key, cf); + cleanedEntries.put(key, cfCleaned); + cfs.addSSTable(SSTableUtils.prepare() + .ks(table.name) + .cf(cfs.columnFamily) + .generation(0) + .write(entries)); + } + }); + + // filter pre-cleaned entries locally, and ensure that the end result is equal + cleanedEntries.keySet().retainAll(keys); + SSTableReader cleaned = SSTableUtils.prepare() + .ks(table.name) + .cf(cfs.columnFamily) + .generation(0) + .write(cleanedEntries); + SSTableReader streamed = cfs.getSSTables().iterator().next(); + SSTableUtils.assertContentEquals(cleaned, streamed); } @Test @@ -208,4 +312,9 @@ public class StreamingTransferTest exten assertEquals(entry.getKey(), rows.get(0).key); } } + + public interface Mutator + { + public void mutate(String key, String col, long timestamp) throws Exception; + } }