make sure we close the last sstablewriter when rows are dropped patch by yukim and jbellis for CASSANDRA-5077
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d96e32b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d96e32b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d96e32b Branch: refs/heads/trunk Commit: 1d96e32b871efaa484246f2a4b6b195648302688 Parents: d6a3845 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Dec 28 12:02:10 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Dec 28 12:02:10 2012 -0500 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionTask.java | 30 ++++++++++----- 1 files changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d96e32b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index e4e15bc..7168280 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -169,6 +169,7 @@ public class CompactionTask extends AbstractCompactionTask row.close(); continue; } + // If the row is cached, we call removeDeleted on at read time it to have coherent query returns, // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely. controller.removeDeletedInCache(row.key); @@ -187,19 +188,28 @@ public class CompactionTask extends AbstractCompactionTask } } } - if (!iter.hasNext() || newSSTableSegmentThresholdReached(writer)) + + if (newSSTableSegmentThresholdReached(writer)) { - SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact)); - cachedKeyMap.put(toIndex, cachedKeys); - sstables.add(toIndex); - if (iter.hasNext()) - { - writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact); - writers.add(writer); - cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); - } + SSTableReader sstable = writer.closeAndOpenReader(getMaxDataAge(toCompact)); + cachedKeyMap.put(sstable, cachedKeys); + sstables.add(sstable); + writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact); + writers.add(writer); + cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); } } + + if (writer.getFilePointer() > 0) + { + SSTableReader sstable = writer.closeAndOpenReader(getMaxDataAge(toCompact)); + cachedKeyMap.put(sstable, cachedKeys); + sstables.add(sstable); + } + else + { + writer.abort(); + } } catch (Throwable t) {