Author: jbellis Date: Thu Aug 11 19:29:34 2011 New Revision: 1156757 URL: http://svn.apache.org/viewvc?rev=1156757&view=rev Log: make sure pre-truncate CL segments are discarded
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1156757&r1=1156756&r2=1156757&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Aug 11 19:29:34 2011 @@ -232,7 +232,7 @@ * Disable compaction throttling during bootstrap (CASSANDRA-2612) * fix CQL treatment of > and < operators in range slices (CASSANDRA-2592) * fix potential double-application of counter updates on commitlog replay - (CASSANDRA-2419) + by moving replay position from header to sstable metadata (CASSANDRA-2419) * JDBC CQL driver exposes getColumn for access to timestamp * JDBC ResultSetMetadata properties added to AbstractType * r/m clustertool (CASSANDRA-2607) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1156757&r1=1156756&r2=1156757&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Aug 11 19:29:34 2011 @@ -581,6 +581,7 @@ public class ColumnFamilyStore implement assert getMemtableThreadSafe() == oldMemtable; oldMemtable.freeze(); final ReplayPosition ctx = writeCommitLog ? CommitLog.instance.getContext() : ReplayPosition.NONE; + logger.debug("flush position is {}", ctx); // submit the memtable for any indexed sub-cfses, and our own. List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>(); @@ -1532,6 +1533,37 @@ public class ColumnFamilyStore implement } /** + * Waits for flushes started BEFORE THIS METHOD IS CALLED to finish. + * Does NOT guarantee that no flush is active when it returns. + */ + private void waitForActiveFlushes() + { + Future<?> future; + Table.switchLock.writeLock().lock(); + try + { + future = postFlushExecutor.submit(new Runnable() { public void run() { } }); + } + finally + { + Table.switchLock.writeLock().unlock(); + } + + try + { + future.get(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (ExecutionException e) + { + throw new AssertionError(e); + } + } + + /** * Truncate practically deletes the entire column family's data * @return a Future to the delete operation. Call the future's get() to make * sure the column family has been deleted @@ -1544,14 +1576,33 @@ public class ColumnFamilyStore implement // We accomplish this by first flushing manually, then snapshotting, and // recording the timestamp IN BETWEEN those actions. Any sstables created // with this timestamp or greater time, will not be marked for delete. - try - { - forceBlockingFlush(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + // + // Bonus complication: since we store replay position in sstable metadata, + // truncating those sstables means we will replay any CL segments from the + // beginning if we restart before they are discarded for normal reasons + // post-truncate. So we need to (a) force a new segment so the currently + // active one can be discarded, and (b) flush *all* CFs so that unflushed + // data in others don't keep any pre-truncate CL segments alive. + // + // Bonus bonus: simply forceFlush of all the CF is not enough, because if + // for a given column family the memtable is clean, forceFlush will return + // immediately, even though there could be a memtable being flush at the same + // time. So to guarantee that all segments can be cleaned out, we need + // "waitForActiveFlushes" after the new segment has been created. + CommitLog.instance.forceNewSegment(); + waitForActiveFlushes(); + List<Future<?>> futures = new ArrayList<Future<?>>(); + ReplayPosition position = CommitLog.instance.getContext(); + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + { + Future<?> f = cfs.forceFlush(); + if (f != null) + futures.add(f); + } + FBUtilities.waitOnFutures(futures); + // if everything was clean, flush won't have called discard + CommitLog.instance.discardCompletedSegments(metadata.cfId, position); + // sleep a little to make sure that our truncatedAt comes after any sstable // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. try Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1156757&r1=1156756&r2=1156757&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Aug 11 19:29:34 2011 @@ -81,9 +81,14 @@ public class SystemTable ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(dotSeven); if (cf == null) { - // upgrading from 0.6 to 0.7. - logger.info("Upgrading to 0.7. Purging hints if there are any. Old hints will be snapshotted."); - new Truncation(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF).apply(); + // 0.7+ marker not found. Remove hints and add the marker. + ColumnFamilyStore hintsCfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HintedHandOffManager.HINTS_CF); + if (hintsCfs.getSSTables().size() > 0) + { + logger.info("Possible 0.6-format hints found. Snapshotting as 'old-hints' and purging"); + hintsCfs.snapshot("old-hints"); + hintsCfs.removeAllSSTables(); + } RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, COOKIE_KEY); rm.add(new QueryPath(STATUS_CF, null, hintsPurged6to7), ByteBufferUtil.bytes("oh yes, it they were purged."), System.currentTimeMillis()); rm.apply(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1156757&r1=1156756&r2=1156757&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu Aug 11 19:29:34 2011 @@ -496,7 +496,6 @@ public class CommitLog implements Commit } } - void sync() throws IOException { currentSegment().sync(); @@ -532,6 +531,50 @@ public class CommitLog implements Commit return getSize(); } + public void forceNewSegment() + { + Callable<?> task = new Callable() + { + public Object call() throws IOException + { + createNewSegment(); + return null; + } + }; + + try + { + executor.submit(task).get(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + } + + private void createNewSegment() throws IOException + { + sync(); + segments.add(new CommitLogSegment()); + + // Maintain desired CL size cap + if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024) + { + // Force a flush on all CFs keeping the oldest segment from being removed + CommitLogSegment oldestSegment = segments.peek(); + assert oldestSegment != null; // has to be at least the one we just added + for (Integer dirtyCFId : oldestSegment.cfLastWrite.keySet()) + { + String keypace = CFMetaData.getCF(dirtyCFId).left; + Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush(); + } + } + } + // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that // without breaking the fragile CheaterFutureTask in BatchCLES. class LogRecordAdder implements Callable, Runnable @@ -550,23 +593,7 @@ public class CommitLog implements Commit currentSegment().write(rowMutation); // roll log if necessary if (currentSegment().length() >= SEGMENT_SIZE) - { - sync(); - segments.add(new CommitLogSegment()); - - // Maintain desired CL size cap - if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024) - { - // Force a flush on all CFs keeping the oldest segment from being removed - CommitLogSegment oldestSegment = segments.peek(); - assert oldestSegment != null; // has to be at least the one we just added - for (Integer dirtyCFId : oldestSegment.cfLastWrite.keySet()) - { - String keypace = CFMetaData.getCF(dirtyCFId).left; - Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush(); - } - } - } + createNewSegment(); } catch (IOException e) { Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java?rev=1156757&r1=1156756&r2=1156757&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java Thu Aug 11 19:29:34 2011 @@ -73,6 +73,7 @@ public class RecoveryManagerTruncateTest rm.apply(); cfs.forceBlockingFlush(); cfs.truncate().get(); + CommitLog.instance.resetUnsafe(); CommitLog.recover(); assertNull(getFromTable(table, "Standard1", "keymulti", "col1")); }