Fix wrong progress when streaming uncompressed

patch by yukim; reviewed by Josh McKenzie for CASSANDRA-7878


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3db38d7e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3db38d7e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3db38d7e

Branch: refs/heads/cassandra-2.1
Commit: 3db38d7ed12960657cd5a79374c4ef28d6b9a966
Parents: b1166c0
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 4 11:27:39 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Sep 26 12:22:44 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamWriter.java       | 22 ++++++++++----------
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db38d7e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00603f3..40e2f2c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@
  * Make disruptor_thrift_server invocation pool configurable (CASSANDRA-7594)
  * Make repair no-op when RF=1 (CASSANDRA-7864)
  * Fix NPE when table dropped during streaming (CASSANDRA-7946)
+ * Fix wrong progress when streaming uncompressed (CASSANDRA-7878)
 Merged from 1.2:
  * Don't index tombstones (CASSANDRA-7828)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db38d7e/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java 
b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 5a5163f..43bc26a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -87,7 +87,7 @@ public class StreamWriter
             for (Pair<Long, Long> section : sections)
             {
                 long start = validator == null ? section.left : 
validator.chunkStart(section.left);
-                int skipBytes = (int) (section.left - start);
+                int readOffset = (int) (section.left - start);
                 // seek to the beginning of the section
                 file.seek(start);
                 if (validator != null)
@@ -96,14 +96,14 @@ public class StreamWriter
                 // length of the section to read
                 long length = section.right - start;
                 // tracks write progress
-                long bytesTransferred = 0;
-                while (bytesTransferred < length)
+                long bytesRead = 0;
+                while (bytesRead < length)
                 {
-                    long lastWrite = write(file, validator, skipBytes, length, 
bytesTransferred);
-                    bytesTransferred += lastWrite;
-                    progress += lastWrite;
+                    long lastBytesRead = write(file, validator, readOffset, 
length, bytesRead);
+                    bytesRead += lastBytesRead;
+                    progress += (lastBytesRead - readOffset);
                     session.progress(sstable.descriptor, 
ProgressInfo.Direction.OUT, progress, totalSize);
-                    skipBytes = 0;
+                    readOffset = 0;
                 }
 
                 // make sure that current section is send
@@ -132,10 +132,10 @@ public class StreamWriter
      * @param reader The file reader to read from
      * @param validator validator to verify data integrity
      * @param start number of bytes to skip transfer, but include for 
validation.
-     * @param length The full length that should be transferred
-     * @param bytesTransferred Number of bytes remaining to transfer
+     * @param length The full length that should be read from {@code reader}
+     * @param bytesTransferred Number of bytes already read out of {@code 
length}
      *
-     * @return Number of bytes transferred
+     * @return Number of bytes read
      *
      * @throws java.io.IOException on any I/O error
      */
@@ -148,7 +148,7 @@ public class StreamWriter
         if (validator != null)
             validator.validate(transferBuffer, 0, minReadable);
 
-        limiter.acquire(toTransfer);
+        limiter.acquire(toTransfer - start);
         compressedOutput.write(transferBuffer, start, (toTransfer - start));
 
         return toTransfer;

Reply via email to