make sure we close the last sstablewriter when rows are dropped
patch by yukim and jbellis for CASSANDRA-5077


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d96e32b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d96e32b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d96e32b

Branch: refs/heads/trunk
Commit: 1d96e32b871efaa484246f2a4b6b195648302688
Parents: d6a3845
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Fri Dec 28 12:02:10 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Fri Dec 28 12:02:10 2012 -0500

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java    |   30 ++++++++++-----
 1 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d96e32b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index e4e15bc..7168280 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -169,6 +169,7 @@ public class CompactionTask extends AbstractCompactionTask
                     row.close();
                     continue;
                 }
+
                 // If the row is cached, we call removeDeleted on at read time 
it to have coherent query returns,
                 // but if the row is not pushed out of the cache, obsolete 
tombstones will persist indefinitely.
                 controller.removeDeletedInCache(row.key);
@@ -187,19 +188,28 @@ public class CompactionTask extends AbstractCompactionTask
                         }
                     }
                 }
-                if (!iter.hasNext() || 
newSSTableSegmentThresholdReached(writer))
+
+                if (newSSTableSegmentThresholdReached(writer))
                 {
-                    SSTableReader toIndex = 
writer.closeAndOpenReader(getMaxDataAge(toCompact));
-                    cachedKeyMap.put(toIndex, cachedKeys);
-                    sstables.add(toIndex);
-                    if (iter.hasNext())
-                    {
-                        writer = cfs.createCompactionWriter(keysPerSSTable, 
cfs.directories.getLocationForDisk(dataDirectory), toCompact);
-                        writers.add(writer);
-                        cachedKeys = new HashMap<DecoratedKey, 
RowIndexEntry>();
-                    }
+                    SSTableReader sstable = 
writer.closeAndOpenReader(getMaxDataAge(toCompact));
+                    cachedKeyMap.put(sstable, cachedKeys);
+                    sstables.add(sstable);
+                    writer = cfs.createCompactionWriter(keysPerSSTable, 
cfs.directories.getLocationForDisk(dataDirectory), toCompact);
+                    writers.add(writer);
+                    cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
                 }
             }
+
+            if (writer.getFilePointer() > 0)
+            {
+                SSTableReader sstable = 
writer.closeAndOpenReader(getMaxDataAge(toCompact));
+                cachedKeyMap.put(sstable, cachedKeys);
+                sstables.add(sstable);
+            }
+            else
+            {
+                writer.abort();
+            }
         }
         catch (Throwable t)
         {

Reply via email to