Author: gdusbabek
Date: Mon Sep 27 09:04:46 2010
New Revision: 1001629

URL: http://svn.apache.org/viewvc?rev=1001629&view=rev
Log:
add progress to streams. patch by Nick Bailey, reviewed by Gary Dusbabek. 
CASSANDRA-1489

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Mon 
Sep 27 09:04:46 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.net;
+package org.apache.cassandra.streaming;
 
 import java.io.*;
 import java.net.InetAddress;
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -38,13 +39,15 @@ public class FileStreamTask extends Wrap
 {
     private static Logger logger = LoggerFactory.getLogger( 
FileStreamTask.class );
     
+    // 10MB chunks
+    public static final int CHUNK_SIZE = 10*1024*1024;
     // around 10 minutes at the default rpctimeout
     public static final int MAX_CONNECT_ATTEMPTS = 8;
 
     private final StreamHeader header;
     private final InetAddress to;
     
-    FileStreamTask(StreamHeader header, InetAddress to)
+    public FileStreamTask(StreamHeader header, InetAddress to)
     {
         this.header = header;
         this.to = to;
@@ -94,9 +97,14 @@ public class FileStreamTask extends Wrap
                 long length = section.right - section.left;
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
-                    bytesTransferred += fc.transferTo(section.left + 
bytesTransferred, length - bytesTransferred, channel);
+                {
+                    long toTransfer = Math.min(CHUNK_SIZE, length - 
bytesTransferred);
+                    long lastWrite = fc.transferTo(section.left + 
bytesTransferred, toTransfer, channel);
+                    bytesTransferred += lastWrite;
+                    header.file.progress += lastWrite;
+                }
                 if (logger.isDebugEnabled())
-                    logger.debug("Bytes transferred " + bytesTransferred);
+                    logger.debug("Bytes transferred " + bytesTransferred + "/" 
+ header.file.size);
             }
         }
         finally

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon 
Sep 27 09:04:46 2010
@@ -46,6 +46,7 @@ import org.apache.cassandra.io.util.Data
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
 import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.GuidGenerator;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Mon Sep 27 09:04:46 2010
@@ -50,6 +50,8 @@ public class IncomingStreamReader
 
         session = StreamInSession.get(remoteAddress.getAddress(), 
header.sessionId);
         session.addFiles(header.pendingFiles);
+        // set the current file we are streaming so progress shows up in jmx
+        session.setCurrentFile(header.file);
         session.setTable(header.table);
         // pendingFile gets the new context for the local node.
         remoteFile = header.file;
@@ -82,7 +84,12 @@ public class IncomingStreamReader
                 long length = section.right - section.left;
                 long bytesRead = 0;
                 while (bytesRead < length)
-                    bytesRead += fc.transferFrom(socketChannel, offset + 
bytesRead, length - bytesRead);
+                {
+                    long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - 
bytesRead);
+                    long lastRead = fc.transferFrom(socketChannel, offset + 
bytesRead, toRead);
+                    bytesRead += lastRead;
+                    remoteFile.progress += lastRead;
+                }
                 offset += length;
             }
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java 
Mon Sep 27 09:04:46 2010
@@ -48,6 +48,8 @@ public class PendingFile
     public final Descriptor desc;
     public final String component;
     public final List<Pair<Long,Long>> sections;
+    public final long size;
+    public long progress;
 
     public PendingFile(Descriptor desc, PendingFile pf)
     {
@@ -59,6 +61,12 @@ public class PendingFile
         this.desc = desc;
         this.component = component;
         this.sections = sections;
+        long tempSize = 0;
+        for(Pair<Long,Long> section : sections)
+        {
+            tempSize += section.right - section.left;
+        }
+        size = tempSize;
     }
 
     public String getFilename()
@@ -82,7 +90,7 @@ public class PendingFile
 
     public String toString()
     {
-        return getFilename() + "/" + StringUtils.join(sections, ",");
+        return getFilename() + "/" + StringUtils.join(sections, ",") + "\n\t 
progress=" + progress + "/" + size + " - " + progress*100/size + "%";
     }
 
     public static class PendingFileSerializer implements 
ICompactSerializer<PendingFile>

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Mon Sep 27 09:04:46 2010
@@ -49,6 +49,7 @@ public class StreamInSession
     private String table;
     private final List<Future<SSTableReader>> buildFutures = new 
ArrayList<Future<SSTableReader>>();
     private ColumnFamilyStore cfs;
+    private PendingFile current;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
     {
@@ -80,6 +81,11 @@ public class StreamInSession
         return session;
     }
 
+    public void setCurrentFile(PendingFile file)
+    {
+        this.current = file;
+    }
+
     public void setTable(String table)
     {
         this.table = table;
@@ -106,6 +112,8 @@ public class StreamInSession
         buildFutures.add(future);
 
         files.remove(remoteFile);
+        if (remoteFile.equals(current))
+            current = null;
         StreamReply reply = new StreamReply(remoteFile.getFilename(), 
getSessionId(), StreamReply.Status.FILE_FINISHED);
         // send a StreamStatus message telling the source node it can delete 
this file
         MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
@@ -179,17 +187,20 @@ public class StreamInSession
     }
 
     /** query the status of incoming files. */
-    public static List<PendingFile> getIncomingFiles(InetAddress host)
+    public static Set<PendingFile> getIncomingFiles(InetAddress host)
     {
-        List<PendingFile> list = new ArrayList<PendingFile>();
+        Set<PendingFile> set = new HashSet<PendingFile>();
         for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry : 
sessions.entrySet())
         {
             if (entry.getKey().left.equals(host))
             {
                 StreamInSession session = entry.getValue();
-                list.addAll(session.files);
+                set.addAll(session.files);
+                if(session.current != null) {
+                    set.add(session.current);
+                }
             }
         }
-        return list;
+        return set;
     }
 }


Reply via email to