Author: slebresne Date: Fri Nov 4 08:35:33 2011 New Revision: 1197426 URL: http://svn.apache.org/viewvc?rev=1197426&view=rev Log: Never return more columns than requested patch by byronclark; reviewed by slebresne for CASSANDRA-3303 and 3395
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1197426&r1=1197425&r2=1197426&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Nov 4 08:35:33 2011 @@ -11,6 +11,7 @@ * add JMX call to clean (failed) repair sessions (CASSANDRA-3316) * fix sstableloader reference acquisition bug (CASSANDRA-3438) * fix estimated row size regression (CASSANDRA-3451) + * make sure we don't return more columns than asked (CASSANDRA-3303, 3395) Merged from 0.8: * acquire compactionlock during truncate (CASSANDRA-3399) * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415) Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1197426&r1=1197425&r2=1197426&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java Fri Nov 4 08:35:33 2011 @@ -31,6 +31,7 @@ import org.apache.cassandra.io.IVersione import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.service.IReadCommand; +import org.apache.cassandra.service.RepairCallback; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -66,7 +67,7 @@ public abstract class ReadCommand implem this.queryPath = queryPath; this.commandType = cmdType; } - + public boolean isDigestQuery() { return isDigestQuery; @@ -81,7 +82,7 @@ public abstract class ReadCommand implem { return queryPath.columnFamilyName; } - + public abstract ReadCommand copy(); public abstract Row getRow(Table table) throws IOException; @@ -95,6 +96,18 @@ public abstract class ReadCommand implem { return table; } + + // maybeGenerateRetryCommand is used to generate a retry for short reads + public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row) + { + return null; + } + + // maybeTrim removes columns from a response that is too long + public void maybeTrim(Row row) + { + // noop + } } class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1197426&r1=1197425&r2=1197426&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Fri Nov 4 08:35:33 2011 @@ -19,17 +19,25 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.service.RepairCallback; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SliceFromReadCommand extends ReadCommand { + static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class); + public final ByteBuffer start, finish; public final boolean reversed; public final int count; @@ -62,6 +70,64 @@ public class SliceFromReadCommand extend } @Override + public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row) + { + int maxLiveColumns = handler.getMaxLiveColumns(); + int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount() : 0; + + assert maxLiveColumns <= count; + if ((maxLiveColumns == count) && (liveColumnsInRow < count)) + { + int retryCount = count + count - liveColumnsInRow; + return new RetriedSliceFromReadCommand(table, key, queryPath, start, finish, reversed, count, retryCount); + } + + return null; + } + + @Override + public void maybeTrim(Row row) + { + if ((row == null) || (row.cf == null)) + return; + + int liveColumnsInRow = row.cf.getLiveColumnCount(); + + if (liveColumnsInRow > getRequestedCount()) + { + int columnsToTrim = liveColumnsInRow - getRequestedCount(); + + logger.debug("trimming {} live columns to the originally requested {}", row.cf.getLiveColumnCount(), getRequestedCount()); + + Collection<IColumn> columns; + if (reversed) + columns = row.cf.getSortedColumns(); + else + columns = row.cf.getReverseSortedColumns(); + + Collection<ByteBuffer> toRemove = new HashSet<ByteBuffer>(); + + Iterator<IColumn> columnIterator = columns.iterator(); + while (columnIterator.hasNext() && (toRemove.size() < columnsToTrim)) + { + IColumn column = columnIterator.next(); + if (column.isLive()) + toRemove.add(column.name()); + } + + for (ByteBuffer columnName : toRemove) + { + row.cf.remove(columnName); + } + } + } + + protected int getRequestedCount() + { + return count; + } + + @Override public String toString() { return "SliceFromReadCommand(" + Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1197426&r1=1197425&r2=1197426&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Fri Nov 4 08:35:33 2011 @@ -685,7 +685,10 @@ public class StorageProxy implements Sto long startTime2 = System.currentTimeMillis(); Row row = handler.get(); if (row != null) + { + command.maybeTrim(row); rows.add(row); + } if (logger.isDebugEnabled()) logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms."); @@ -739,35 +742,21 @@ public class StorageProxy implements Sto throw new AssertionError(e); // full data requested from each node here, no digests should be sent } - // retry short reads, otherwise add the row to our resultset - if (command instanceof SliceFromReadCommand) + ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row); + if (retryCommand != null) { - // short reads are only possible on SliceFromReadCommand - SliceFromReadCommand sliceCommand = (SliceFromReadCommand) command; - int maxLiveColumns = handler.getMaxLiveColumns(); - int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount() : 0; - - assert maxLiveColumns <= sliceCommand.count; - if ((maxLiveColumns == sliceCommand.count) && (liveColumnsInRow < sliceCommand.count)) - { - logger.debug("detected short read: expected {} columns, but only resolved {} columns", - sliceCommand.count, liveColumnsInRow); + logger.debug("issuing retry for read command"); + if (commandsToRetry == Collections.EMPTY_LIST) + commandsToRetry = new ArrayList<ReadCommand>(); + commandsToRetry.add(retryCommand); + continue; + } - int retryCount = sliceCommand.count + sliceCommand.count - liveColumnsInRow; - SliceFromReadCommand retryCommand = new SliceFromReadCommand(command.table, - command.key, - command.queryPath, - sliceCommand.start, - sliceCommand.finish, - sliceCommand.reversed, - retryCount); - if (commandsToRetry == Collections.EMPTY_LIST) - commandsToRetry = new ArrayList<ReadCommand>(); - commandsToRetry.add(retryCommand); - continue; - } + if (row != null) + { + command.maybeTrim(row); + rows.add(row); } - rows.add(row); } } } while (!commandsToRetry.isEmpty());