Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 c1d2b86a4 -> 2170ac4d3


Make CQLSSTableWriter sync within partitions

patch by slebresne; reviewed by blerer for CASSANDRA-7360


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2170ac4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2170ac4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2170ac4d

Branch: refs/heads/cassandra-2.0
Commit: 2170ac4d3922425f008f441d2d92e3233449c765
Parents: c1d2b86
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Tue Sep 9 18:18:12 2014 -0700
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Sep 9 18:18:12 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/TreeMapBackedSortedColumns.java          |  2 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java | 11 ++-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 82 ++++++++++++++++----
 .../io/sstable/SSTableSimpleUnsortedWriter.java | 38 ++++++---
 .../io/sstable/CQLSSTableWriterTest.java        | 39 ++++++++++
 6 files changed, 140 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2c0cae6..3ee938a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.11:
+ * Make CQLSSTableWriter sync within partitions (CASSANDRA-7360)
  * Potentially use non-local replicas in CqlConfigHelper (CASSANDRA-7906)
  * Explicitly disallowing mixing multi-column and single-column
    relations on clustering columns (CASSANDRA-7711)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java 
b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index 466833b..ae6e798 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -49,7 +49,7 @@ public class TreeMapBackedSortedColumns extends 
AbstractThreadUnsafeSortedColumn
         return (AbstractType<?>)map.comparator();
     }
 
-    private TreeMapBackedSortedColumns(CFMetaData metadata)
+    protected TreeMapBackedSortedColumns(CFMetaData metadata)
     {
         super(metadata);
         this.map = new TreeMap<ByteBuffer, Column>(metadata.comparator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index db87226..2c6f82a 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -111,7 +111,7 @@ public abstract class AbstractSSTableSimpleWriter 
implements Closeable
         currentSuperColumn = name;
     }
 
-    private void addColumn(Column column)
+    protected void addColumn(Column column) throws IOException
     {
         if (columnFamily.metadata().isSuper())
         {
@@ -129,7 +129,7 @@ public abstract class AbstractSSTableSimpleWriter 
implements Closeable
      * @param value the column value
      * @param timestamp the column timestamp
      */
-    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) 
throws IOException
     {
         addColumn(new Column(name, value, timestamp));
     }
@@ -144,7 +144,7 @@ public abstract class AbstractSSTableSimpleWriter 
implements Closeable
      * expiring the column, and as a consequence should be synchronized with 
the cassandra servers time. If {@code timestamp} represents
      * the insertion time in microseconds (which is not required), this should 
be {@code (timestamp / 1000) + (ttl * 1000)}.
      */
-    public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long 
timestamp, int ttl, long expirationTimestampMS)
+    public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long 
timestamp, int ttl, long expirationTimestampMS) throws IOException
     {
         addColumn(new ExpiringColumn(name, value, timestamp, ttl, 
(int)(expirationTimestampMS / 1000)));
     }
@@ -154,7 +154,7 @@ public abstract class AbstractSSTableSimpleWriter 
implements Closeable
      * @param name the column name
      * @param value the value of the counter
      */
-    public void addCounterColumn(ByteBuffer name, long value)
+    public void addCounterColumn(ByteBuffer name, long value) throws 
IOException
     {
         addColumn(new CounterColumn(name,
                                     
CounterContext.instance().createRemote(counterid, 1L, value, 
HeapAllocator.instance),
@@ -179,8 +179,7 @@ public abstract class AbstractSSTableSimpleWriter 
implements Closeable
         return currentKey;
     }
 
-
     protected abstract void writeRow(DecoratedKey key, ColumnFamily 
columnFamily) throws IOException;
 
-    protected abstract ColumnFamily getColumnFamily();
+    protected abstract ColumnFamily getColumnFamily() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 61990ec..49a1259 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -40,6 +41,7 @@ import 
org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -206,13 +208,22 @@ public class CQLSSTableWriter implements Closeable
                                                        
insert.getTimeToLive(values),
                                                        
Collections.<ByteBuffer, ColumnGroupMap>emptyMap());
 
-        for (ByteBuffer key: keys)
+        try
         {
-            if (writer.currentKey() == null || 
!key.equals(writer.currentKey().key))
-                writer.newRow(key);
-            insert.addUpdateForKey(writer.currentColumnFamily(), key, 
clusteringPrefix, params);
+            for (ByteBuffer key: keys)
+            {
+                if (writer.currentKey() == null || 
!key.equals(writer.currentKey().key))
+                    writer.newRow(key);
+                insert.addUpdateForKey(writer.currentColumnFamily(), key, 
clusteringPrefix, params);
+            }
+            return this;
+        }
+        catch (BufferedWriter.SyncException e)
+        {
+            // If we use a BufferedWriter and had a problem writing to disk, 
the IOException has been
+            // wrapped in a SyncException (see BufferedWriter below). We want 
to extract that IOE.
+            throw (IOException)e.getCause();
         }
-        return this;
     }
 
     /**
@@ -463,21 +474,58 @@ public class CQLSSTableWriter implements Closeable
             if (insert == null)
                 throw new IllegalStateException("No insert statement 
specified, you should provide an insert statement through using()");
 
-            AbstractSSTableSimpleWriter writer;
-            if (sorted)
+            AbstractSSTableSimpleWriter writer = sorted
+                                               ? new 
SSTableSimpleWriter(directory, schema, partitioner)
+                                               : new BufferedWriter(directory, 
schema, partitioner, bufferSizeInMB);
+            return new CQLSSTableWriter(writer, insert, boundNames);
+        }
+    }
+
+    /**
+     * CQLSSTableWriter doesn't use the method addColumn() from 
AbstractSSTableSimpleWriter.
+     * Instead, it adds cells directly to the ColumnFamily the latter exposes. 
But this means
+     * that the sync() method of SSTableSimpleUnsortedWriter is not called (at 
least not for
+     * each CQL row, so adding many rows to the same partition can buffer too 
much data in
+     * memory - #7360). So we create a slightly modified 
SSTableSimpleUnsortedWriter that uses
+     * a tweaked ColumnFamily object that calls back the proper method after 
each added cell
+     * so we sync when we should.
+     */
+    private static class BufferedWriter extends SSTableSimpleUnsortedWriter
+    {
+        public BufferedWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner, long bufferSizeInMB)
+        {
+            super(directory, metadata, partitioner, bufferSizeInMB);
+        }
+
+        @Override
+        protected ColumnFamily createColumnFamily()
+        {
+            return new TreeMapBackedSortedColumns(metadata)
             {
-                writer = new SSTableSimpleWriter(directory,
-                                                 schema,
-                                                 partitioner);
-            }
-            else
+                @Override
+                public void addColumn(Column column, Allocator allocator)
+                {
+                    super.addColumn(column, allocator);
+                    try
+                    {
+                        countColumn(column);
+                    }
+                    catch (IOException e)
+                    {
+                        // addColumn does not throw IOException but we want to 
report this to the user,
+                        // so wrap it in a temporary RuntimeException that 
we'll catch in rawAddRow above.
+                        throw new SyncException(e);
+                    }
+                }
+            };
+        }
+
+        static class SyncException extends RuntimeException
+        {
+            SyncException(IOException ioe)
             {
-                writer = new SSTableSimpleUnsortedWriter(directory,
-                                                         schema,
-                                                         partitioner,
-                                                         bufferSizeInMB);
+                super(ioe);
             }
-            return new CQLSSTableWriter(writer, insert, boundNames);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 6b39024..39ec71d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -27,10 +27,12 @@ import java.util.concurrent.SynchronousQueue;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TreeMapBackedSortedColumns;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.compress.CompressionParameters;
@@ -100,30 +102,48 @@ public class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
 
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) 
throws IOException
     {
-        currentSize += key.key.remaining() + 
ColumnFamily.serializer.serializedSize(columnFamily, 
MessagingService.current_version) * 1.2;
+        // Nothing to do since we'll sync if needed in addColumn.
+    }
+
+    @Override
+    protected void addColumn(Column column) throws IOException
+    {
+        super.addColumn(column);
+        countColumn(column);
+    }
+
+    protected void countColumn(Column column) throws IOException
+    {
+        currentSize += column.serializedSize(TypeSizes.NATIVE);
 
+        // We don't want to sync in writeRow() only as this might blow up the 
bufferSize for wide rows.
         if (currentSize > bufferSize)
             sync();
     }
 
-    protected ColumnFamily getColumnFamily()
+    protected ColumnFamily getColumnFamily() throws IOException
     {
         ColumnFamily previous = buffer.get(currentKey);
         // If the CF already exist in memory, we'll just continue adding to it
         if (previous == null)
         {
-            previous = TreeMapBackedSortedColumns.factory.create(metadata);
+            previous = createColumnFamily();
             buffer.put(currentKey, previous);
-        }
-        else
-        {
-            // We will reuse a CF that we have counted already. But because it 
will be easier to add the full size
-            // of the CF in the next writeRow call than to find out the delta, 
we just remove the size until that next call
-            currentSize -= currentKey.key.remaining() + 
ColumnFamily.serializer.serializedSize(previous, 
MessagingService.current_version) * 1.2;
+
+            // Since this new CF will be written by the next sync(), count its 
header. And a CF header
+            // on disk is:
+            //   - the row key: 2 bytes size + key size bytes
+            //   - the row level deletion infos: 4 + 8 bytes
+            currentSize += 14 + currentKey.key.remaining();
         }
         return previous;
     }
 
+    protected ColumnFamily createColumnFamily() throws IOException
+    {
+        return TreeMapBackedSortedColumns.factory.create(metadata);
+    }
+
     public void close() throws IOException
     {
         sync();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index bdc4b94..de814e1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Iterator;
 
 import com.google.common.collect.ImmutableMap;
@@ -119,4 +122,40 @@ public class CQLSSTableWriterTest
         assertEquals(null, row.getBytes("v1")); // Using getBytes because we 
know it won't NPE
         assertEquals(12, row.getInt("v2"));
     }
+
+    @Test
+    public void testSyncWithinPartition() throws Exception
+    {
+        // Check that the write respect the buffer size even if we only insert 
rows withing the same partition (#7360)
+        // To do that simply, we use a writer with a buffer of 1MB, and write 
2 rows in the same partition with a value
+        // > 1MB and validate that this created more than 1 sstable.
+        File tempdir = Files.createTempDir();
+        String schema = "CREATE TABLE ks.test ("
+                      + "  k int PRIMARY KEY,"
+                      + "  v blob"
+                      + ")";
+        String insert = "INSERT INTO ks.test (k, v) VALUES (?, ?)";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(tempdir)
+                                                  .forTable(schema)
+                                                  
.withPartitioner(StorageService.instance.getPartitioner())
+                                                  .using(insert)
+                                                  .withBufferSizeInMB(1)
+                                                  .build();
+
+        ByteBuffer val = ByteBuffer.allocate(1024 * 1050);
+
+        writer.addRow(0, val);
+        writer.addRow(1, val);
+        writer.close();
+
+        FilenameFilter filterDataFiles = new FilenameFilter()
+        {
+            public boolean accept(File dir, String name)
+            {
+                return name.endsWith("-Data.db");
+            }
+        };
+        assert tempdir.list(filterDataFiles).length > 1 : 
Arrays.toString(tempdir.list(filterDataFiles));
+    }
 }

Reply via email to