Author: slebresne
Date: Fri Nov  4 08:35:33 2011
New Revision: 1197426

URL: http://svn.apache.org/viewvc?rev=1197426&view=rev
Log:
Never return more columns than requested
patch by byronclark; reviewed by slebresne for CASSANDRA-3303 and 3395

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Nov  4 08:35:33 2011
@@ -11,6 +11,7 @@
  * add JMX call to clean (failed) repair sessions (CASSANDRA-3316)
  * fix sstableloader reference acquisition bug (CASSANDRA-3438)
  * fix estimated row size regression (CASSANDRA-3451)
+ * make sure we don't return more columns than asked (CASSANDRA-3303, 3395)
 Merged from 0.8:
  * acquire compactionlock during truncate (CASSANDRA-3399)
  * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
 Fri Nov  4 08:35:33 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.IVersione
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.service.RepairCallback;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -66,7 +67,7 @@ public abstract class ReadCommand implem
         this.queryPath = queryPath;
         this.commandType = cmdType;
     }
-    
+
     public boolean isDigestQuery()
     {
         return isDigestQuery;
@@ -81,7 +82,7 @@ public abstract class ReadCommand implem
     {
         return queryPath.columnFamilyName;
     }
-    
+
     public abstract ReadCommand copy();
 
     public abstract Row getRow(Table table) throws IOException;
@@ -95,6 +96,18 @@ public abstract class ReadCommand implem
     {
         return table;
     }
+
+    // maybeGenerateRetryCommand is used to generate a retry for short reads
+    public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row 
row)
+    {
+        return null;
+    }
+
+    // maybeTrim removes columns from a response that is too long
+    public void maybeTrim(Row row)
+    {
+        // noop
+    }
 }
 
 class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
 Fri Nov  4 08:35:33 2011
@@ -19,17 +19,25 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
 
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.RepairCallback;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SliceFromReadCommand extends ReadCommand
 {
+    static final Logger logger = 
LoggerFactory.getLogger(SliceFromReadCommand.class);
+
     public final ByteBuffer start, finish;
     public final boolean reversed;
     public final int count;
@@ -62,6 +70,64 @@ public class SliceFromReadCommand extend
     }
 
     @Override
+    public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row 
row)
+    {
+        int maxLiveColumns = handler.getMaxLiveColumns();
+        int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount() : 0;
+
+        assert maxLiveColumns <= count;
+        if ((maxLiveColumns == count) && (liveColumnsInRow < count))
+        {
+            int retryCount = count + count - liveColumnsInRow;
+            return new RetriedSliceFromReadCommand(table, key, queryPath, 
start, finish, reversed, count, retryCount);
+        }
+
+        return null;
+    }
+
+    @Override
+    public void maybeTrim(Row row)
+    {
+        if ((row == null) || (row.cf == null))
+            return;
+
+        int liveColumnsInRow = row.cf.getLiveColumnCount();
+
+        if (liveColumnsInRow > getRequestedCount())
+        {
+            int columnsToTrim = liveColumnsInRow - getRequestedCount();
+
+            logger.debug("trimming {} live columns to the originally requested 
{}", row.cf.getLiveColumnCount(), getRequestedCount());
+
+            Collection<IColumn> columns;
+            if (reversed)
+                columns = row.cf.getSortedColumns();
+            else
+                columns = row.cf.getReverseSortedColumns();
+
+            Collection<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
+
+            Iterator<IColumn> columnIterator = columns.iterator();
+            while (columnIterator.hasNext() && (toRemove.size() < 
columnsToTrim))
+            {
+                IColumn column = columnIterator.next();
+                if (column.isLive())
+                    toRemove.add(column.name());
+            }
+
+            for (ByteBuffer columnName : toRemove)
+            {
+                row.cf.remove(columnName);
+            }
+        }
+    }
+
+    protected int getRequestedCount()
+    {
+        return count;
+    }
+
+    @Override
     public String toString()
     {
         return "SliceFromReadCommand(" +

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
 Fri Nov  4 08:35:33 2011
@@ -685,7 +685,10 @@ public class StorageProxy implements Sto
                     long startTime2 = System.currentTimeMillis();
                     Row row = handler.get();
                     if (row != null)
+                    {
+                        command.maybeTrim(row);
                         rows.add(row);
+                    }
 
                     if (logger.isDebugEnabled())
                         logger.debug("Read: " + (System.currentTimeMillis() - 
startTime2) + " ms.");
@@ -739,35 +742,21 @@ public class StorageProxy implements Sto
                         throw new AssertionError(e); // full data requested 
from each node here, no digests should be sent
                     }
 
-                    // retry short reads, otherwise add the row to our 
resultset
-                    if (command instanceof SliceFromReadCommand)
+                    ReadCommand retryCommand = 
command.maybeGenerateRetryCommand(handler, row);
+                    if (retryCommand != null)
                     {
-                        // short reads are only possible on 
SliceFromReadCommand
-                        SliceFromReadCommand sliceCommand = 
(SliceFromReadCommand) command;
-                        int maxLiveColumns = handler.getMaxLiveColumns();
-                        int liveColumnsInRow = row != null ? 
row.cf.getLiveColumnCount() : 0;
-
-                        assert maxLiveColumns <= sliceCommand.count;
-                        if ((maxLiveColumns == sliceCommand.count) && 
(liveColumnsInRow < sliceCommand.count))
-                        {
-                            logger.debug("detected short read: expected {} 
columns, but only resolved {} columns",
-                                         sliceCommand.count, liveColumnsInRow);
+                        logger.debug("issuing retry for read command");
+                        if (commandsToRetry == Collections.EMPTY_LIST)
+                            commandsToRetry = new ArrayList<ReadCommand>();
+                        commandsToRetry.add(retryCommand);
+                        continue;
+                    }
 
-                            int retryCount = sliceCommand.count + 
sliceCommand.count - liveColumnsInRow;
-                            SliceFromReadCommand retryCommand = new 
SliceFromReadCommand(command.table,
-                                                                               
          command.key,
-                                                                               
          command.queryPath,
-                                                                               
          sliceCommand.start,
-                                                                               
          sliceCommand.finish,
-                                                                               
          sliceCommand.reversed,
-                                                                               
          retryCount);
-                            if (commandsToRetry == Collections.EMPTY_LIST)
-                                commandsToRetry = new ArrayList<ReadCommand>();
-                            commandsToRetry.add(retryCommand);
-                            continue;
-                        }
+                    if (row != null)
+                    {
+                        command.maybeTrim(row);
+                        rows.add(row);
                     }
-                    rows.add(row);
                 }
             }
         } while (!commandsToRetry.isEmpty());


Reply via email to