Author: jbellis Date: Fri Aug 5 15:43:58 2011 New Revision: 1154274 URL: http://svn.apache.org/viewvc?rev=1154274&view=rev Log: fix tracker getting out of sync with underlying data source patch by jbellis; reviewed by slebresne for CASSANDRA-2901
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1154274&r1=1154273&r2=1154274&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Fri Aug 5 15:43:58 2011 @@ -21,11 +21,7 @@ package org.apache.cassandra.io.sstable; */ -import java.io.DataInput; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.IOError; -import java.io.IOException; +import java.io.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +40,6 @@ public class SSTableIdentityIterator imp private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class); private final DecoratedKey key; - private final long finishedAt; private final DataInput input; private final long dataStart; public final long dataSize; @@ -110,7 +105,6 @@ public class SSTableIdentityIterator imp this.expireBefore = (int)(System.currentTimeMillis() / 1000); this.fromRemote = fromRemote; this.validateColumns = checkData; - finishedAt = dataStart + dataSize; try { @@ -118,6 +112,9 @@ public class SSTableIdentityIterator imp { RandomAccessReader file = (RandomAccessReader) input; file.seek(this.dataStart); + if (dataStart + dataSize > file.length()) + throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s", + dataSize, dataStart, file.getPath(), file.length())); if (checkData) { try @@ -141,6 +138,7 @@ public class SSTableIdentityIterator imp logger.debug("Invalid row summary in {}; will rebuild it", sstable); } file.seek(this.dataStart); + inputWithTracker.reset(0); } } @@ -150,11 +148,7 @@ public class SSTableIdentityIterator imp ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker); columnCount = inputWithTracker.readInt(); - if (input instanceof RandomAccessReader) - { - RandomAccessReader file = (RandomAccessReader) input; - columnPosition = file.getFilePointer(); - } + columnPosition = dataStart + inputWithTracker.getBytesRead(); } catch (IOException e) { @@ -174,15 +168,7 @@ public class SSTableIdentityIterator imp public boolean hasNext() { - if (input instanceof RandomAccessReader) - { - RandomAccessReader file = (RandomAccessReader) input; - return file.getFilePointer() < finishedAt; - } - else - { - return inputWithTracker.getBytesRead() < dataSize; - } + return inputWithTracker.getBytesRead() < dataSize; } public IColumn next() @@ -230,36 +216,21 @@ public class SSTableIdentityIterator imp public void echoData(DataOutput out) throws IOException { - // only effective when input is from file - if (input instanceof RandomAccessReader) - { - RandomAccessReader file = (RandomAccessReader) input; - file.seek(dataStart); - while (file.getFilePointer() < finishedAt) - { - out.write(file.readByte()); - } - } - else - { + if (!(input instanceof RandomAccessReader)) throw new UnsupportedOperationException(); - } + + ((RandomAccessReader) input).seek(dataStart); + inputWithTracker.reset(0); + while (inputWithTracker.getBytesRead() < dataSize) + out.write(inputWithTracker.readByte()); } public ColumnFamily getColumnFamilyWithColumns() throws IOException { + assert inputWithTracker.getBytesRead() == headerSize(); ColumnFamily cf = columnFamily.cloneMeShallow(); - if (input instanceof RandomAccessReader) - { - RandomAccessReader file = (RandomAccessReader) input; - file.seek(columnPosition - 4); // seek to before column count int - ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote); - } - else - { - // since we already read column count, just pass that value and continue deserialization - ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote); - } + // since we already read column count, just pass that value and continue deserialization + ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote); if (validateColumns) { try @@ -274,6 +245,11 @@ public class SSTableIdentityIterator imp return cf; } + private long headerSize() + { + return columnPosition - dataStart; + } + public int compareTo(SSTableIdentityIterator o) { return key.compareTo(o.key); @@ -281,23 +257,18 @@ public class SSTableIdentityIterator imp public void reset() { - // only effective when input is from file - if (input instanceof RandomAccessReader) + if (!(input instanceof RandomAccessReader)) + throw new UnsupportedOperationException(); + + RandomAccessReader file = (RandomAccessReader) input; + try { - RandomAccessReader file = (RandomAccessReader) input; - try - { - file.seek(columnPosition); - } - catch (IOException e) - { - throw new IOError(e); - } - inputWithTracker.reset(); + file.seek(columnPosition); } - else + catch (IOException e) { - throw new UnsupportedOperationException(); + throw new IOError(e); } + inputWithTracker.reset(headerSize()); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1154274&r1=1154273&r2=1154274&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Fri Aug 5 15:43:58 2011 @@ -117,7 +117,7 @@ public class IncomingStreamReader long bytesRead = 0; while (bytesRead < length) { - in.reset(); + in.reset(0); key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in)); long dataSize = SSTableReader.readRowSize(in, localFile.desc); ColumnFamily cf = null; Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java?rev=1154274&r1=1154273&r2=1154274&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java Fri Aug 5 15:43:58 2011 @@ -40,13 +40,13 @@ public class BytesReadTracker implements { return bytesRead; } - + /** - * reset counter to 0 + * reset counter to @param count */ - public void reset() + public void reset(long count) { - bytesRead = 0; + bytesRead = count; } public boolean readBoolean() throws IOException Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java?rev=1154274&r1=1154273&r2=1154274&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java Fri Aug 5 15:43:58 2011 @@ -115,7 +115,7 @@ public class BytesReadTrackerTest dis.close(); } - tracker.reset(); + tracker.reset(0); assertEquals(0, tracker.getBytesRead()); } @@ -152,6 +152,8 @@ public class BytesReadTrackerTest int s = tracker.readUnsignedShort(); assertEquals(1, s); assertEquals(3, tracker.getBytesRead()); + + assertEquals(testData.length, tracker.getBytesRead()); } finally { @@ -185,6 +187,8 @@ public class BytesReadTrackerTest tracker.readFully(out); assertEquals("890", new String(out)); assertEquals(10, tracker.getBytesRead()); + + assertEquals(testData.length, tracker.getBytesRead()); } finally {