Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 29ed6fe2e -> 882df8a21
Avoid writing range tombstones after END_OF_ROW marker. Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-10791 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26ca68 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26ca68 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26ca68 Branch: refs/heads/cassandra-2.2 Commit: 0b26ca68747cdecb907d7c238e04b39836efe3d1 Parents: 5175326 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Tue Dec 1 11:38:09 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Dec 2 14:59:11 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnIndex.java | 16 ++-- .../org/apache/cassandra/db/RangeTombstone.java | 5 ++ .../cassandra/db/compaction/Scrubber.java | 25 +++++- .../io/sstable/SSTableIdentityIterator.java | 79 +++++++++++++++---- .../cassandra/io/sstable/SSTableWriter.java | 2 + .../Keyspace1-Standard3-jb-1-Summary.db | Bin 71 -> 63 bytes .../Keyspace1-StandardInteger1-ka-2-CRC.db | Bin 0 -> 8 bytes .../Keyspace1-StandardInteger1-ka-2-Data.db | Bin 0 -> 12357 bytes .../Keyspace1-StandardInteger1-ka-2-Digest.sha1 | 1 + .../Keyspace1-StandardInteger1-ka-2-Filter.db | Bin 0 -> 176 bytes .../Keyspace1-StandardInteger1-ka-2-Index.db | Bin 0 -> 108 bytes ...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes .../Keyspace1-StandardInteger1-ka-2-Summary.db | Bin 0 -> 80 bytes .../Keyspace1-StandardInteger1-ka-2-TOC.txt | 8 ++ .../apache/cassandra/db/RowIndexEntryTest.java | 1 + .../unit/org/apache/cassandra/db/ScrubTest.java | 57 ++++++++++++- .../streaming/StreamingTransferTest.java | 46 ++++++++++- 18 files changed, 215 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b0f9588..e00abfe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791) * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 0ea5c87..f63dfe1 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -147,6 +147,7 @@ public class ColumnIndex add(tombstone); tombstone = rangeIter.hasNext() ? rangeIter.next() : null; } + finishAddingAtoms(); ColumnIndex index = build(); maybeWriteEmptyRowHeader(); @@ -167,6 +168,7 @@ public class ColumnIndex OnDiskAtom c = columns.next(); add(c); } + finishAddingAtoms(); return build(); } @@ -218,15 +220,19 @@ public class ColumnIndex } } - public ColumnIndex build() throws IOException + public void finishAddingAtoms() throws IOException { - // all columns were GC'd after all - if (lastColumn == null) - return ColumnIndex.EMPTY; - long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer); endPosition += size; blockSize += size; + } + + public ColumnIndex build() + { + assert !tombstoneTracker.hasUnwrittenTombstones(); // finishAddingAtoms must be called before building. + // all columns were GC'd after all + if (lastColumn == null) + return ColumnIndex.EMPTY; // the last column may have fallen on an index boundary already. if not, index it explicitly. if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 4d22d48..5e41792 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -325,6 +325,11 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements return false; } + public boolean hasUnwrittenTombstones() + { + return !unwrittenTombstones.isEmpty(); + } + /** * The tracker needs to track expired range tombstone but keep tracks that they are * expired, so this is what this class is used for. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 400df08..e02f901 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -155,6 +155,22 @@ public class Scrubber implements Closeable if (scrubInfo.isStopRequested()) throw new CompactionInterruptedException(scrubInfo.getCompactionInfo()); + updateIndexKey(); + + if (prevKey != null && indexFile != null) + { + long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex; + if (dataFile.getFilePointer() < nextRowStart) + { + // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table. + saveOutOfOrderRow(prevKey, + SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns), + String.format("Row fragment detected after END_OF_ROW at key %s", prevKey)); + if (dataFile.isEOF()) + break; + } + } + long rowStart = dataFile.getFilePointer(); outputHandler.debug("Reading row at " + rowStart); @@ -170,8 +186,6 @@ public class Scrubber implements Closeable // check for null key below } - updateIndexKey(); - long dataStart = dataFile.getFilePointer(); long dataStartFromIndex = -1; @@ -369,8 +383,13 @@ public class Scrubber implements Closeable private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms) { + saveOutOfOrderRow(key, atoms, String.format("Out of order row detected (%s found after %s)", key, prevKey)); + } + + void saveOutOfOrderRow(DecoratedKey key, SSTableIdentityIterator atoms, String message) + { // TODO bitch if the row is too large? if it is there's not much we can do ... - outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey)); + outputHandler.warn(message); // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 498ad26..45994d0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -20,9 +20,13 @@ package org.apache.cassandra.io.sstable; import java.io.*; import java.util.Iterator; +import com.google.common.collect.AbstractIterator; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.io.sstable.Descriptor.Version; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.serializers.MarshalException; @@ -66,6 +70,35 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL); } + /** + * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker. + */ + public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData) + { + final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL; + final CellNameType type = sstable.metadata.comparator; + final int expireBefore = (int) (System.currentTimeMillis() / 1000); + final Version version = sstable.descriptor.version; + final long dataEnd = file.getFilePointer() + dataSize; + return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE, + new AbstractIterator<OnDiskAtom>() + { + protected OnDiskAtom computeNext() + { + if (file.getFilePointer() >= dataEnd) + return endOfData(); + try + { + return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }); + } + // sstable may be null *if* checkData is false // If it is null, we assume the data is in the current file format private SSTableIdentityIterator(CFMetaData metadata, @@ -77,23 +110,15 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat SSTableReader sstable, ColumnSerializer.Flag flag) { - assert !checkData || (sstable != null); - this.in = in; - this.filename = filename; - this.key = key; - this.dataSize = dataSize; - this.flag = flag; - this.validateColumns = checkData; - this.sstable = sstable; - - Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version; - int expireBefore = (int) (System.currentTimeMillis() / 1000); - columnFamily = ArrayBackedSortedColumns.factory.create(metadata); + this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename), + metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version)); + } + private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename) + { try { - columnFamily.delete(DeletionTime.serializer.deserialize(in)); - atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion); + return DeletionTime.serializer.deserialize(in); } catch (IOException e) { @@ -103,6 +128,32 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat } } + // sstable may be null *if* checkData is false + // If it is null, we assume the data is in the current file format + private SSTableIdentityIterator(CFMetaData metadata, + DataInput in, + String filename, + DecoratedKey key, + long dataSize, + boolean checkData, + SSTableReader sstable, + ColumnSerializer.Flag flag, + DeletionTime deletion, + Iterator<OnDiskAtom> atomIterator) + { + assert !checkData || (sstable != null); + this.in = in; + this.filename = filename; + this.key = key; + this.dataSize = dataSize; + this.flag = flag; + this.validateColumns = checkData; + this.sstable = sstable; + columnFamily = ArrayBackedSortedColumns.factory.create(metadata); + columnFamily.delete(deletion); + this.atomIterator = atomIterator; + } + public DecoratedKey getKey() { return key; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 8e0b5f7..8620f30 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import com.google.common.collect.Sets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -319,6 +320,7 @@ public class SSTableWriter extends SSTable columnIndexer.add(atom); // This write the atom on disk too } + columnIndexer.finishAddingAtoms(); columnIndexer.maybeWriteEmptyRowHeader(); dataFile.stream.writeShort(END_OF_ROW); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db index 376ca9d..7621f07 100644 Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db new file mode 100644 index 0000000..fc23cfe Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db new file mode 100644 index 0000000..a4157d3 Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 new file mode 100644 index 0000000..fb42fa9 --- /dev/null +++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 @@ -0,0 +1 @@ +3265926428 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db new file mode 100644 index 0000000..eb0ae30 Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db new file mode 100644 index 0000000..69a2fce Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db new file mode 100644 index 0000000..1cba196 Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db new file mode 100644 index 0000000..22cfa6a Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt new file mode 100644 index 0000000..503f64d --- /dev/null +++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt @@ -0,0 +1,8 @@ +Digest.sha1 +Summary.db +Filter.db +Index.db +Statistics.db +Data.db +CRC.db +TOC.txt http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index 237573e..ce58e11 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -58,6 +58,7 @@ public class RowIndexEntryTest extends SchemaLoader add(column); } while (size < DatabaseDescriptor.getColumnIndexSize() * 3); + finishAddingAtoms(); }}.build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index f8acd22..167671b 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.UUIDGen; + import org.apache.commons.lang3.StringUtils; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -50,6 +52,7 @@ import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Scrubber; import org.apache.cassandra.exceptions.WriteTimeoutException; @@ -60,7 +63,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.Util.cellname; import static org.apache.cassandra.Util.column; - import static junit.framework.Assert.assertNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -362,6 +364,59 @@ public class ScrubTest extends SchemaLoader assert rows.size() == 6 : "Got " + rows.size(); } + @Test + public void testScrub10791() throws Exception + { + // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled. + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + String columnFamily = "StandardInteger1"; + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily); + cfs.clearUnsafe(); + + String root = System.getProperty("corrupt-sstable-root"); + assert root != null; + File rootDir = new File(root); + assert rootDir.isDirectory(); + Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL); + CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname); + + // open without validation for scrubbing + Set<Component> components = new HashSet<>(); + components.add(Component.DATA); + components.add(Component.PRIMARY_INDEX); + components.add(Component.FILTER); + components.add(Component.STATS); + components.add(Component.SUMMARY); + components.add(Component.TOC); + SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata); + + Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true); + scrubber.scrub(); + + cfs.loadNewSSTables(); + assertEquals(7, countCells(cfs)); + } + + private int countCells(ColumnFamilyStore cfs) + { + int cellCount = 0; + for (SSTableReader sstable : cfs.getSSTables()) + { + Iterator<OnDiskAtomIterator> it = sstable.getScanner(); + while (it.hasNext()) + { + Iterator<OnDiskAtom> itr = it.next(); + while (itr.hasNext()) + { + ++cellCount; + itr.next(); + } + } + } + return cellCount; + } + private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException { boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 06ebdd3..31dc492 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -26,9 +26,11 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,9 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.compaction.Scrubber; +import org.apache.cassandra.db.compaction.Scrubber.ScrubResult; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryFilter; @@ -51,8 +56,9 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; - +import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.concurrent.Refs; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.apache.cassandra.Util.cellname; @@ -265,7 +271,7 @@ public class StreamingTransferTest extends SchemaLoader Keyspace keyspace = Keyspace.open(ks); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); - String key = "key1"; + String key = "key0"; Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key)); // add columns of size slightly less than column_index_size to force insert column index rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); @@ -274,9 +280,21 @@ public class StreamingTransferTest extends SchemaLoader // add RangeTombstones cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); + cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); + rm.apply(); + + key = "key1"; + rm = new Mutation(ks, ByteBufferUtil.bytes(key)); + // add columns of size slightly less than column_index_size to force insert column index + rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); + cf = rm.addOrGet(cfname); + // add RangeTombstones + cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); rm.apply(); + cfs.forceBlockingFlush(); + int cellCount = countCells(cfs); SSTableReader sstable = cfs.getSSTables().iterator().next(); cfs.clearUnsafe(); transferSSTables(sstable); @@ -284,8 +302,30 @@ public class StreamingTransferTest extends SchemaLoader // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); + // Verify table + assertEquals(cellCount, countCells(cfs)); + List<Row> rows = Util.getRangeSlice(cfs); - assertEquals(1, rows.size()); + assertEquals(2, rows.size()); + } + + private int countCells(ColumnFamilyStore cfs) + { + int cellCount = 0; + for (SSTableReader sstable : cfs.getSSTables()) + { + Iterator<OnDiskAtomIterator> it = sstable.getScanner(); + while (it.hasNext()) + { + Iterator<OnDiskAtom> itr = it.next(); + while (itr.hasNext()) + { + ++cellCount; + itr.next(); + } + } + } + return cellCount; } @Test