Author: jbellis
Date: Fri Aug  5 15:43:58 2011
New Revision: 1154274

URL: http://svn.apache.org/viewvc?rev=1154274&view=rev
Log:
fix tracker getting out of sync with underlying data source
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java
    
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
 Fri Aug  5 15:43:58 2011
@@ -21,11 +21,7 @@ package org.apache.cassandra.io.sstable;
  */
 
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +40,6 @@ public class SSTableIdentityIterator imp
     private static final Logger logger = 
LoggerFactory.getLogger(SSTableIdentityIterator.class);
 
     private final DecoratedKey key;
-    private final long finishedAt;
     private final DataInput input;
     private final long dataStart;
     public final long dataSize;
@@ -110,7 +105,6 @@ public class SSTableIdentityIterator imp
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
         this.fromRemote = fromRemote;
         this.validateColumns = checkData;
-        finishedAt = dataStart + dataSize;
 
         try
         {
@@ -118,6 +112,9 @@ public class SSTableIdentityIterator imp
             {
                 RandomAccessReader file = (RandomAccessReader) input;
                 file.seek(this.dataStart);
+                if (dataStart + dataSize > file.length())
+                    throw new IOException(String.format("dataSize of %s 
starting at %s would be larger than file %s length %s",
+                                          dataSize, dataStart, file.getPath(), 
file.length()));
                 if (checkData)
                 {
                     try
@@ -141,6 +138,7 @@ public class SSTableIdentityIterator imp
                         logger.debug("Invalid row summary in {}; will rebuild 
it", sstable);
                     }
                     file.seek(this.dataStart);
+                    inputWithTracker.reset(0);
                 }
             }
 
@@ -150,11 +148,7 @@ public class SSTableIdentityIterator imp
             
ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, 
inputWithTracker);
             columnCount = inputWithTracker.readInt();
 
-            if (input instanceof RandomAccessReader)
-            {
-                RandomAccessReader file = (RandomAccessReader) input;
-                columnPosition = file.getFilePointer();
-            }
+            columnPosition = dataStart + inputWithTracker.getBytesRead();
         }
         catch (IOException e)
         {
@@ -174,15 +168,7 @@ public class SSTableIdentityIterator imp
 
     public boolean hasNext()
     {
-        if (input instanceof RandomAccessReader)
-        {
-            RandomAccessReader file = (RandomAccessReader) input;
-            return file.getFilePointer() < finishedAt;
-        }
-        else
-        {
-            return inputWithTracker.getBytesRead() < dataSize;
-        }
+        return inputWithTracker.getBytesRead() < dataSize;
     }
 
     public IColumn next()
@@ -230,36 +216,21 @@ public class SSTableIdentityIterator imp
 
     public void echoData(DataOutput out) throws IOException
     {
-        // only effective when input is from file
-        if (input instanceof RandomAccessReader)
-        {
-            RandomAccessReader file = (RandomAccessReader) input;
-            file.seek(dataStart);
-            while (file.getFilePointer() < finishedAt)
-            {
-                out.write(file.readByte());
-            }
-        }
-        else
-        {
+        if (!(input instanceof RandomAccessReader))
             throw new UnsupportedOperationException();
-        }
+
+        ((RandomAccessReader) input).seek(dataStart);
+        inputWithTracker.reset(0);
+        while (inputWithTracker.getBytesRead() < dataSize)
+            out.write(inputWithTracker.readByte());
     }
 
     public ColumnFamily getColumnFamilyWithColumns() throws IOException
     {
+        assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        if (input instanceof RandomAccessReader)
-        {
-            RandomAccessReader file = (RandomAccessReader) input;
-            file.seek(columnPosition - 4); // seek to before column count int
-            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, 
false, fromRemote);
-        }
-        else
-        {
-            // since we already read column count, just pass that value and 
continue deserialization
-            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, 
columnCount, false, fromRemote);
-        }
+        // since we already read column count, just pass that value and 
continue deserialization
+        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, 
columnCount, false, fromRemote);
         if (validateColumns)
         {
             try
@@ -274,6 +245,11 @@ public class SSTableIdentityIterator imp
         return cf;
     }
 
+    private long headerSize()
+    {
+        return columnPosition - dataStart;
+    }
+
     public int compareTo(SSTableIdentityIterator o)
     {
         return key.compareTo(o.key);
@@ -281,23 +257,18 @@ public class SSTableIdentityIterator imp
 
     public void reset()
     {
-        // only effective when input is from file
-        if (input instanceof RandomAccessReader)
+        if (!(input instanceof RandomAccessReader))
+            throw new UnsupportedOperationException();
+
+        RandomAccessReader file = (RandomAccessReader) input;
+        try
         {
-            RandomAccessReader file = (RandomAccessReader) input;
-            try
-            {
-                file.seek(columnPosition);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-            inputWithTracker.reset();
+            file.seek(columnPosition);
         }
-        else
+        catch (IOException e)
         {
-            throw new UnsupportedOperationException();
+            throw new IOError(e);
         }
+        inputWithTracker.reset(headerSize());
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Fri Aug  5 15:43:58 2011
@@ -117,7 +117,7 @@ public class IncomingStreamReader
                 long bytesRead = 0;
                 while (bytesRead < length)
                 {
-                    in.reset();
+                    in.reset(0);
                     key = 
SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, 
ByteBufferUtil.readWithShortLength(in));
                     long dataSize = SSTableReader.readRowSize(in, 
localFile.desc);
                     ColumnFamily cf = null;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java 
Fri Aug  5 15:43:58 2011
@@ -40,13 +40,13 @@ public class BytesReadTracker implements
     {
         return bytesRead;
     }
-    
+
     /**
-     * reset counter to 0
+     * reset counter to @param count
      */
-    public void reset()
+    public void reset(long count)
     {
-        bytesRead = 0;
+        bytesRead = count;
     }
 
     public boolean readBoolean() throws IOException

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java 
Fri Aug  5 15:43:58 2011
@@ -115,7 +115,7 @@ public class BytesReadTrackerTest
             dis.close();
         }
 
-        tracker.reset();
+        tracker.reset(0);
         assertEquals(0, tracker.getBytesRead());
     }
 
@@ -152,6 +152,8 @@ public class BytesReadTrackerTest
             int s = tracker.readUnsignedShort();
             assertEquals(1, s);
             assertEquals(3, tracker.getBytesRead());
+
+            assertEquals(testData.length, tracker.getBytesRead());
         }
         finally
         {
@@ -185,6 +187,8 @@ public class BytesReadTrackerTest
             tracker.readFully(out);
             assertEquals("890", new String(out));
             assertEquals(10, tracker.getBytesRead());
+
+            assertEquals(testData.length, tracker.getBytesRead());
         }
         finally
         {


Reply via email to