Author: jbellis Date: Mon Mar 14 19:24:54 2011 New Revision: 1081525 URL: http://svn.apache.org/viewvc?rev=1081525&view=rev Log: fix tombstone handling in repair andsstable2json patch by slebresne; reviewed by jbellis for CASSANDRA-2279
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081525&r1=1081524&r2=1081525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Mar 14 19:24:54 2011 @@ -1,5 +1,6 @@ 0.7.5 * Avoid seeking when sstable2json exports the entire file (CASSANDRA-2318) + * fix tombstone handling in repair and sstable2json (CASSANDRA-2279) 0.7.4 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1081525&r1=1081524&r2=1081525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java Mon Mar 14 19:24:54 2011 @@ -99,7 +99,6 @@ public class RowIteratorFactory } Iterator<IColumnIterator> collated = IteratorUtils.collatedIterator(COMPARE_BY_KEY, iterators); - final Memtable firstMemtable = memtables.iterator().next(); // reduce rows from all sources into a single row ReducingIterator<IColumnIterator, Row> reduced = new ReducingIterator<IColumnIterator, Row>(collated) @@ -107,11 +106,26 @@ public class RowIteratorFactory private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(); private final List<IColumnIterator> colIters = new ArrayList<IColumnIterator>(); private DecoratedKey key; + private ColumnFamily returnCF; + + @Override + protected void onKeyChange() + { + this.returnCF = ColumnFamily.create(cfs.metadata); + } public void reduce(IColumnIterator current) { this.colIters.add(current); this.key = current.getKey(); + try + { + this.returnCF.delete(current.getColumnFamily()); + } + catch (IOException e) + { + throw new IOError(e); + } } @Override @@ -125,7 +139,6 @@ public class RowIteratorFactory Comparator<IColumn> colComparator = filter.filter.getColumnComparator(comparator); Iterator<IColumn> colCollated = IteratorUtils.collatedIterator(colComparator, colIters); - ColumnFamily returnCF; // First check if this row is in the rowCache. If it is we can skip the rest ColumnFamily cached = cfs.getRawCachedRow(key); if (cached != null) @@ -135,33 +148,8 @@ public class RowIteratorFactory } else if (colCollated.hasNext()) { - returnCF = firstMemtable.getColumnFamily(key); - // TODO this is a little subtle: the Memtable ColumnIterator has to be a shallow clone of the source CF, - // with deletion times set correctly, so we can use it as the "base" CF to add query results to. - // (for sstable ColumnIterators we do not care if it is a shallow clone or not.) - returnCF = returnCF == null ? ColumnFamily.create(firstMemtable.getTableName(), filter.getColumnFamilyName()) - : returnCF.cloneMeShallow(); - long lastDeletedAt = Long.MIN_VALUE; - for (IColumnIterator columns : colIters) - { - columns.hasNext(); // force cf initializtion - try - { - if (columns.getColumnFamily().isMarkedForDelete()) - lastDeletedAt = Math.max(lastDeletedAt, columns.getColumnFamily().getMarkedForDeleteAt()); - } - catch (IOException e) - { - throw new IOError(e); - } - } - returnCF.markedForDeleteAt.set(lastDeletedAt); filter.collectCollatedColumns(returnCF, colCollated, gcBefore); } - else - { - returnCF = null; - } Row rv = new Row(key, returnCF); colIters.clear(); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1081525&r1=1081524&r2=1081525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Mon Mar 14 19:24:54 2011 @@ -31,9 +31,11 @@ import org.apache.cassandra.db.IColumn; public interface IColumnIterator extends Iterator<IColumn> { /** - * returns the CF of the column being iterated. Do not modify the returned CF; clone first. - * The CF is only guaranteed to be available after a call to next() or hasNext(). - * Guaranteed to be non-null. + * returns the CF of the column being iterated. + * Do not modify the returned CF; clone first. + * This is guaranteed to be non-null and that the returned CF have the correct metadata + * (markedForDeleteAt and localDeletionTime). The full CF is however only guaranteed to + * be available after a call to next() or hasNext(). * @throws IOException */ public abstract ColumnFamily getColumnFamily() throws IOException; Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java?rev=1081525&r1=1081524&r2=1081525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java Mon Mar 14 19:24:54 2011 @@ -55,6 +55,7 @@ public abstract class ReducingIterator<T if (last == null && !source.hasNext()) return endOfData(); + onKeyChange(); boolean keyChanged = false; while (!keyChanged) { @@ -73,6 +74,12 @@ public abstract class ReducingIterator<T return getReduced(); } + /** + * Called at the begining of each new key, before any reduce is called. + * To be overriden by implementing classes. + */ + protected void onKeyChange() {} + public Iterator<T2> iterator() { return this; Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java?rev=1081525&r1=1081524&r2=1081525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java (original) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java Mon Mar 14 19:24:54 2011 @@ -62,4 +62,51 @@ public class RowIterationTest extends Cl store.forceBlockingFlush(); assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size()); } + + @Test + public void testRowIterationDeletionTime() throws IOException, ExecutionException, InterruptedException + { + Table table = Table.open(TABLE1); + String CF_NAME = "Standard3"; + ColumnFamilyStore store = table.getColumnFamilyStore(CF_NAME); + DecoratedKey key = Util.dk("key"); + + // Delete row in first sstable + RowMutation rm = new RowMutation(TABLE1, key.key); + rm.delete(new QueryPath(CF_NAME, null, null), 0); + rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 0L); + int tstamp1 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime(); + rm.apply(); + store.forceBlockingFlush(); + + // Delete row in second sstable with higher timestamp + rm = new RowMutation(TABLE1, key.key); + rm.delete(new QueryPath(CF_NAME, null, null), 1); + rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 1L); + int tstamp2 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime(); + rm.apply(); + store.forceBlockingFlush(); + + ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf; + assert cf.getMarkedForDeleteAt() == 1L; + assert cf.getLocalDeletionTime() == tstamp2; + } + + @Test + public void testRowIterationDeletion() throws IOException, ExecutionException, InterruptedException + { + Table table = Table.open(TABLE1); + String CF_NAME = "Standard3"; + ColumnFamilyStore store = table.getColumnFamilyStore(CF_NAME); + DecoratedKey key = Util.dk("key"); + + // Delete a row in first sstable + RowMutation rm = new RowMutation(TABLE1, key.key); + rm.delete(new QueryPath(CF_NAME, null, null), 0); + rm.apply(); + store.forceBlockingFlush(); + + ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf; + assert cf != null; + } }