Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/724eabed Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/724eabed Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/724eabed Branch: refs/heads/trunk Commit: 724eabedc2e4e0e0a504463ece035bf1656ded77 Parents: 1ebbaea 0ff7f99 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Mar 4 11:51:54 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Mar 4 11:51:54 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ conf/cassandra-env.sh | 3 ++ .../cql3/statements/UpdateStatement.java | 12 +++--- .../org/apache/cassandra/db/SuperColumns.java | 5 +++ .../db/columniterator/IndexedSliceReader.java | 45 +++++++++++++++++--- .../db/columniterator/SSTableNamesIterator.java | 6 ++- .../io/compress/CompressedSequentialWriter.java | 2 +- .../cassandra/io/sstable/SSTableLoader.java | 7 ++- .../org/apache/cassandra/repair/RepairJob.java | 21 +++++---- .../apache/cassandra/repair/RepairSession.java | 2 +- .../apache/cassandra/streaming/StreamPlan.java | 11 ++++- .../cassandra/streaming/StreamResultFuture.java | 7 ++- .../org/apache/cassandra/tools/BulkLoader.java | 7 +-- .../CompressedRandomAccessReaderTest.java | 42 ++++++++++++++++++ 14 files changed, 139 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 53e1e6f,8eb10cd..ba1e72b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -35,46 -27,28 +35,49 @@@ Merged from 2.0 * Optimize single partition batch statements (CASSANDRA-6737) * Disallow post-query re-ordering when paging (CASSANDRA-6722) * Fix potential paging bug with deleted columns (CASSANDRA-6748) - * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636) - * Fix truncating compression metadata (CASSANDRA-6791) + * Catch memtable flush exceptions during shutdown (CASSANDRA-6735) + * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645) + * Fix UPDATE updating PRIMARY KEY columns implicitly (CASSANDRA-6782) + * Fix IllegalArgumentException when updating from 1.2 with SuperColumns + (CASSANDRA-6733) -Merged from 1.2: - * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541) - * Catch memtable flush exceptions during shutdown (CASSANDRA-6735) - * Fix broken streams when replacing with same IP (CASSANDRA-6622) - * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645) - * Fix partition and range deletes not triggering flush (CASSANDRA-6655) - * Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667) - * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666) - * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649) - * Don't exchange schema between nodes with different versions (CASSANDRA-6695) - * Use real node messaging versions for schema exchange decisions (CASSANDRA-6700) - * IN on the last clustering columns + ORDER BY DESC yield no results (CASSANDRA-6701) - * Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711) - * Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713) - * Support negative timestamps for CQL3 dates in query string (CASSANDRA-6718) - * Avoid NPEs when receiving table changes for an unknown keyspace (CASSANDRA-5631) - * Fix bootstrapping when there is no schema (CASSANDRA-6685) + + +2.1.0-beta1 + * Add flush directory distinct from compaction directories (CASSANDRA-6357) + * Require JNA by default (CASSANDRA-6575) + * add listsnapshots command to nodetool (CASSANDRA-5742) + * Introduce AtomicBTreeColumns (CASSANDRA-6271, 6692) + * Multithreaded commitlog (CASSANDRA-3578) + * allocate fixed index summary memory pool and resample cold index summaries + to use less memory (CASSANDRA-5519) + * Removed multithreaded compaction (CASSANDRA-6142) + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337) + * change logging from log4j to logback (CASSANDRA-5883) + * switch to LZ4 compression for internode communication (CASSANDRA-5887) + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971) + * Remove 1.2 network compatibility code (CASSANDRA-5960) + * Remove leveled json manifest migration code (CASSANDRA-5996) + * Remove CFDefinition (CASSANDRA-6253) + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278) + * User-defined types for CQL3 (CASSANDRA-5590) + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406) + * Batch read from OTC's queue and cleanup (CASSANDRA-1632) + * Secondary index support for collections (CASSANDRA-4511, 6383) + * SSTable metadata(Stats.db) format change (CASSANDRA-6356) + * Push composites support in the storage engine + (CASSANDRA-5417, CASSANDRA-6520) + * Add snapshot space used to cfstats (CASSANDRA-6231) + * Add cardinality estimator for key count estimation (CASSANDRA-5906) + * CF id is changed to be non-deterministic. Data dir/key cache are created + uniquely for CF id (CASSANDRA-5202) + * New counters implementation (CASSANDRA-6504) + * Replace UnsortedColumns, EmptyColumns, TreeMapBackedSortedColumns with new + ArrayBackedSortedColumns (CASSANDRA-6630, CASSANDRA-6662, CASSANDRA-6690) + * Add option to use row cache with a given amount of rows (CASSANDRA-5357) + * Avoid repairing already repaired data (CASSANDRA-5351) + * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649) + * Replace index_interval with min/max_index_interval (CASSANDRA-6379) + * Lift limitation that order by columns must be selected for IN queries (CASSANDRA-4911) 2.0.5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/conf/cassandra-env.sh ---------------------------------------------------------------------- diff --cc conf/cassandra-env.sh index 20f26da,934e463..6b26099 --- a/conf/cassandra-env.sh +++ b/conf/cassandra-env.sh @@@ -165,9 -165,12 +165,12 @@@ JVM_OPTS="$JVM_OPTS -ea if [ "$JVM_VENDOR" != "OpenJDK" -o "$JVM_VERSION" \> "1.6.0" ] \ || [ "$JVM_VERSION" = "1.6.0" -a "$JVM_PATCH_VERSION" -ge 23 ] then - JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.5.jar" + JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.6.jar" fi + # some JVMs will fill up their heap when accessed via JMX, see CASSANDRA-6541 + JVM_OPTS="$JVM_OPTS -XX:+CMSClassUnloadingEnabled" + # enable thread priorities, primarily so we can give periodic tasks # a lower priority to avoid interfering with client workload JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 984f4df,fc9bb66..85ca069 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@@ -47,22 -45,29 +47,24 @@@ public class UpdateStatement extends Mo return true; } - public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) + public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException { - CFDefinition cfDef = cfm.getCfDef(); - // Inserting the CQL row marker (see #4361) - // We always need to insert a marker, because of the following situation: + // We always need to insert a marker for INSERT, because of the following situation: // CREATE TABLE t ( k int PRIMARY KEY, c text ); // INSERT INTO t(k, c) VALUES (1, 1) // DELETE c FROM t WHERE k = 1; // SELECT * FROM t; - // The last query should return one row (but with c == null). Adding - // the marker with the insert make sure the semantic is correct (while making sure a - // 'DELETE FROM t WHERE k = 1' does remove the row entirely) + // The last query should return one row (but with c == null). Adding the marker with the insert make sure + // the semantic is correct (while making sure a 'DELETE FROM t WHERE k = 1' does remove the row entirely) + // + // We do not insert the marker for UPDATE however, as this amount to updating the columns in the WHERE + // clause which is inintuitive (#6782) // // We never insert markers for Super CF as this would confuse the thrift side. - if (cfm.isCQL3Table() && !prefix.isStatic()) - if (type == StatementType.INSERT && cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper()) - { - ByteBuffer name = builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build(); - cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER)); - } ++ if (type == StatementType.INSERT && cfm.isCQL3Table() && !prefix.isStatic()) + cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER)); List<Operation> updates = getOperations(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/db/SuperColumns.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SuperColumns.java index b854856,513db38..46c5577 --- a/src/java/org/apache/cassandra/db/SuperColumns.java +++ b/src/java/org/apache/cassandra/db/SuperColumns.java @@@ -124,47 -205,159 +124,52 @@@ public class SuperColumn } } - public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn) + private static CellNameType subType(CellNameType type) { - return getComparatorFor(metadata, superColumn != null); + return new SimpleDenseCellNameType(type.subtype(1)); } - public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn) ++ public static CellNameType scNameType(CellNameType type) + { - return metadata.isSuper() - ? ((CompositeType)metadata.comparator).types.get(subColumn ? 1 : 0) - : metadata.comparator; ++ return new SimpleDenseCellNameType(type.subtype(0)); + } + - // Extract the first component of a columnName, i.e. the super column name - public static ByteBuffer scName(ByteBuffer columnName) + public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn) { - return CompositeType.extractComponent(columnName, 0); + return getComparatorFor(metadata, superColumn != null); } - // Extract the 2nd component of a columnName, i.e. the sub-column name - public static ByteBuffer subName(ByteBuffer columnName) + public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn) { - return CompositeType.extractComponent(columnName, 1); + return metadata.isSuper() + ? metadata.comparator.subtype(subColumn ? 1 : 0) + : metadata.comparator.asAbstractType(); } - // We don't use CompositeType.Builder mostly because we want to avoid having to provide the comparator. - public static ByteBuffer startOf(ByteBuffer scName) + // Extract the first component of a columnName, i.e. the super column name + public static ByteBuffer scName(Composite columnName) { - int length = scName.remaining(); - ByteBuffer bb = ByteBuffer.allocate(2 + length + 1); - - bb.put((byte) ((length >> 8) & 0xFF)); - bb.put((byte) (length & 0xFF)); - bb.put(scName.duplicate()); - bb.put((byte) 0); - bb.flip(); - return bb; + return columnName.get(0); } - public static ByteBuffer endOf(ByteBuffer scName) + // Extract the 2nd component of a columnName, i.e. the sub-column name + public static ByteBuffer subName(Composite columnName) { - ByteBuffer bb = startOf(scName); - bb.put(bb.remaining() - 1, (byte)1); - return bb; + return columnName.get(1); } - public static SCFilter filterToSC(CompositeType type, IDiskAtomFilter filter) + public static Composite startOf(ByteBuffer scName) { - if (filter instanceof NamesQueryFilter) - return namesFilterToSC(type, (NamesQueryFilter)filter); - else - return sliceFilterToSC(type, (SliceQueryFilter)filter); + return CellNames.compositeDense(scName).start(); } - public static SCFilter namesFilterToSC(CompositeType type, NamesQueryFilter filter) + public static Composite endOf(ByteBuffer scName) { - ByteBuffer scName = null; - SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(filter.columns.comparator()); - for (ByteBuffer name : filter.columns) - { - ByteBuffer newScName = scName(name); - - if (scName == null) - { - scName = newScName; - } - else if (type.types.get(0).compare(scName, newScName) != 0) - { - // If we're selecting column across multiple SC, it's not something we can translate for an old node - throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first."); - } - - newColumns.add(subName(name)); - } - return new SCFilter(scName, new NamesQueryFilter(newColumns)); + return CellNames.compositeDense(scName).end(); } - public static SCFilter sliceFilterToSC(CompositeType type, SliceQueryFilter filter) - { - /* - * There is 3 main cases that we can translate back into super column - * queries: - * 1) We have only one slice where the first component of start and - * finish is the same, we translate as a slice query on one SC. - * 2) We have only one slice, neither the start and finish have a 2nd - * component, and end has the 'end of component' set, we translate - * as a slice of SCs. - * 3) Each slice has the same first component for start and finish, no - * 2nd component and each finish has the 'end of component' set, we - * translate as a names query of SCs (the filter must then not be reversed). - * Otherwise, we can't do much. - */ - - boolean reversed = filter.reversed; - if (filter.slices.length == 1) - { - ByteBuffer start = filter.slices[0].start; - ByteBuffer finish = filter.slices[0].start; - - if (filter.compositesToGroup == 1) - { - // Note: all the resulting filter must have compositeToGroup == 0 because this - // make no sense for super column on the destination node otherwise - if (start.remaining() == 0) - { - if (finish.remaining() == 0) - // An 'IdentityFilter', keep as is (except for the compositeToGroup) - return new SCFilter(null, new SliceQueryFilter(filter.start(), filter.finish(), reversed, filter.count)); - - if (subName(finish) == null - && ((!reversed && !firstEndOfComponent(finish)) || (reversed && firstEndOfComponent(finish)))) - return new SCFilter(null, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, scName(finish), reversed, filter.count)); - } - else if (finish.remaining() == 0) - { - if (subName(start) == null - && ((!reversed && firstEndOfComponent(start)) || (reversed && !firstEndOfComponent(start)))) - return new SCFilter(null, new SliceQueryFilter(scName(start), ByteBufferUtil.EMPTY_BYTE_BUFFER, reversed, filter.count)); - } - else if (subName(start) == null && subName(finish) == null - && (( reversed && !firstEndOfComponent(start) && firstEndOfComponent(finish)) - || (!reversed && firstEndOfComponent(start) && !firstEndOfComponent(finish)))) - { - // A slice of supercolumns - return new SCFilter(null, new SliceQueryFilter(scName(start), scName(finish), reversed, filter.count)); - } - } - else if (filter.compositesToGroup == 0 && type.types.get(0).compare(scName(start), scName(finish)) == 0) - { - // A slice of subcolumns - return new SCFilter(scName(start), filter.withUpdatedSlice(subName(start), subName(finish))); - } - } - else if (!reversed) - { - SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(type.types.get(0)); - for (int i = 0; i < filter.slices.length; ++i) - { - ByteBuffer start = filter.slices[i].start; - ByteBuffer finish = filter.slices[i].finish; - - if (subName(start) != null || subName(finish) != null - || type.types.get(0).compare(scName(start), scName(finish)) != 0 - || firstEndOfComponent(start) || !firstEndOfComponent(finish)) - throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first."); - - columns.add(scName(start)); - } - return new SCFilter(null, new NamesQueryFilter(columns)); - } - throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first."); - } - public static IDiskAtomFilter fromSCFilter(CompositeType type, ByteBuffer scName, IDiskAtomFilter filter) + public static IDiskAtomFilter fromSCFilter(CellNameType type, ByteBuffer scName, IDiskAtomFilter filter) { if (filter instanceof NamesQueryFilter) return fromSCNamesFilter(type, scName, (NamesQueryFilter)filter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java index 2a596ea,b6aa085..9c1fece --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java @@@ -25,9 -27,8 +25,10 @@@ import java.util.List import com.google.common.collect.AbstractIterator; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.composites.CellNameType; ++import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; @@@ -178,6 -179,34 +179,34 @@@ class IndexedSliceReader extends Abstra } } - static int indexFor(SSTableReader sstable, ByteBuffer name, List<IndexHelper.IndexInfo> indexes, AbstractType<?> comparator, boolean reversed, int startIdx) ++ static int indexFor(SSTableReader sstable, Composite name, List<IndexHelper.IndexInfo> indexes, CellNameType comparator, boolean reversed, int startIdx) + { + // If it's a super CF and the sstable is from the old format, then the index will contain old format info, i.e. non composite + // SC names. So we need to 1) use only the SC name part of the comparator and 2) extract only that part from 'name' + if (sstable.metadata.isSuper() && sstable.descriptor.version.hasSuperColumns) + { - AbstractType<?> scComparator = SuperColumns.getComparatorFor(sstable.metadata, false); - ByteBuffer scName = SuperColumns.scName(name); ++ CellNameType scComparator = SuperColumns.scNameType(comparator); ++ Composite scName = CellNames.compositeDense(SuperColumns.scName(name)); + return IndexHelper.indexFor(scName, indexes, scComparator, reversed, startIdx); + } + return IndexHelper.indexFor(name, indexes, comparator, reversed, startIdx); + } + - static ByteBuffer forIndexComparison(SSTableReader sstable, ByteBuffer name) ++ static Composite forIndexComparison(SSTableReader sstable, Composite name) + { + // See indexFor above. + return sstable.metadata.isSuper() && sstable.descriptor.version.hasSuperColumns - ? SuperColumns.scName(name) ++ ? CellNames.compositeDense(SuperColumns.scName(name)) + : name; + } + - static AbstractType<?> comparatorForIndex(SSTableReader sstable, AbstractType<?> comparator) ++ static CellNameType comparatorForIndex(SSTableReader sstable, CellNameType comparator) + { + return sstable.metadata.isSuper() && sstable.descriptor.version.hasSuperColumns - ? SuperColumns.getComparatorFor(sstable.metadata, false) ++ ? SuperColumns.scNameType(comparator) + : comparator; + } + private abstract class BlockFetcher { protected int currentSliceIdx; @@@ -212,22 -241,28 +241,28 @@@ return isBeforeSliceStart(column.name()); } - protected boolean isBeforeSliceStart(ByteBuffer name) + protected boolean isBeforeSliceStart(Composite name) { - ByteBuffer start = currentStart(); - return start.remaining() != 0 && comparator.compare(name, start) < 0; + Composite start = currentStart(); + return !start.isEmpty() && comparator.compare(name, start) < 0; } - protected boolean isIndexEntryBeforeSliceStart(ByteBuffer name) ++ protected boolean isIndexEntryBeforeSliceStart(Composite name) + { - ByteBuffer start = currentStart(); - return start.remaining() != 0 && comparatorForIndex(sstable, comparator).compare(name, forIndexComparison(sstable, start)) < 0; ++ Composite start = currentStart(); ++ return !start.isEmpty() && comparatorForIndex(sstable, comparator).compare(name, forIndexComparison(sstable, start)) < 0; + } + protected boolean isColumnBeforeSliceFinish(OnDiskAtom column) { - ByteBuffer finish = currentFinish(); - return finish.remaining() == 0 || comparator.compare(column.name(), finish) <= 0; + Composite finish = currentFinish(); + return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0; } - protected boolean isAfterSliceFinish(Composite name) - protected boolean isIndexEntryAfterSliceFinish(ByteBuffer name) ++ protected boolean isIndexEntryAfterSliceFinish(Composite name) { - ByteBuffer finish = currentFinish(); - return finish.remaining() != 0 && comparatorForIndex(sstable, comparator).compare(name, forIndexComparison(sstable, finish)) > 0; + Composite finish = currentFinish(); - return !finish.isEmpty() && comparator.compare(name, finish) > 0; ++ return !finish.isEmpty() && comparatorForIndex(sstable, comparator).compare(name, forIndexComparison(sstable, finish)) > 0; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java index 2eb6745,2e84d8d..374dedb --- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java @@@ -181,12 -184,12 +181,12 @@@ public class SSTableNamesIterator exten throws IOException { /* get the various column ranges we have to read */ - AbstractType<?> comparator = metadata.comparator; + CellNameType comparator = metadata.comparator; List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>(); int lastIndexIdx = -1; - for (ByteBuffer name : columns) + for (CellName name : columnNames) { - int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx); + int index = IndexedSliceReader.indexFor(sstable, name, indexList, comparator, false, lastIndexIdx); if (index < 0 || index == indexList.size()) continue; IndexHelper.IndexInfo indexInfo = indexList.get(index); http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/724eabed/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 45b670d,3c9dfe5..22850bd --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@@ -23,10 -24,10 +24,11 @@@ import java.util.Random import org.junit.Test; +import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.marshal.BytesType; + import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.SSTableMetadata; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.*; import static org.junit.Assert.assertEquals; @@@ -49,6 -50,46 +51,46 @@@ public class CompressedRandomAccessRead testResetAndTruncate(File.createTempFile("compressed", "1"), true, 10); testResetAndTruncate(File.createTempFile("compressed", "2"), true, CompressionParameters.DEFAULT_CHUNK_LENGTH); } + @Test + public void test6791() throws IOException, ConfigurationException + { + File f = File.createTempFile("compressed6791_", "3"); + String filename = f.getAbsolutePath(); + try + { + - SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null); ++ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null); + CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector); + + for (int i = 0; i < 20; i++) + writer.write("x".getBytes()); + + FileMark mark = writer.mark(); + // write enough garbage to create new chunks: + for (int i = 0; i < 40; ++i) + writer.write("y".getBytes()); + + writer.resetAndTruncate(mark); + + for (int i = 0; i < 20; i++) + writer.write("x".getBytes()); + writer.close(); + + CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true)); + String res = reader.readLine(); + assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); + assertEquals(40, res.length()); + } + finally + { + // cleanup + if (f.exists()) + f.delete(); + File metadata = new File(filename+ ".metadata"); + if (metadata.exists()) + metadata.delete(); + } + } private void testResetAndTruncate(File f, boolean compressed, int junkSize) throws IOException {