Author: xedin
Date: Fri Sep  2 20:23:42 2011
New Revision: 1164689

URL: http://svn.apache.org/viewvc?rev=1164689&view=rev
Log:
Streams Compression
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3015

Added:
    cassandra/trunk/lib/compress-lzf-0.8.4.jar
    cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt   (with props)
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Sep  2 20:23:42 2011
@@ -58,6 +58,7 @@
  * make the repair of a range repair all replica (CASSANDRA-2610)
  * expose the ability to repair the first range (as returned by the
    partitioner) of a node (CASSANDRA-2606)
+ * Streams Compression (CASSANDRA-3015)
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Added: cassandra/trunk/lib/compress-lzf-0.8.4.jar
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/lib/compress-lzf-0.8.4.jar?rev=1164689&view=auto
==============================================================================
Files cassandra/trunk/lib/compress-lzf-0.8.4.jar (added) and 
cassandra/trunk/lib/compress-lzf-0.8.4.jar Fri Sep  2 20:23:42 2011 differ

Added: cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt?rev=1164689&view=auto
==============================================================================
--- cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt (added)
+++ cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt Fri Sep  2 20:23:42 2011
@@ -0,0 +1,11 @@
+Copyright 2009-2010 Ning, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
\ No newline at end of file

Propchange: cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt
------------------------------------------------------------------------------
    svn:executable = *

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
Fri Sep  2 20:23:42 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.net;
 
 
 import java.io.*;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 
 import org.apache.cassandra.gms.Gossiper;
@@ -78,7 +79,7 @@ public class IncomingTcpConnection exten
                 else
                 {
                     // streaming connections are per-session and have a fixed 
version.  we can't do anything with a new-version stream connection, so drop it.
-                    logger.error("Received untranslated stream from newer 
protcol version. Terminating connection!");
+                    logger.error("Received untranslated stream from newer 
protocol version. Terminating connection!");
                 }
                 // We are done with this connection....
                 return;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java 
Fri Sep  2 20:23:42 2011
@@ -18,9 +18,9 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -40,6 +40,8 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+import com.ning.compress.lzf.LZFOutputStream;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +60,7 @@ public class FileStreamTask extends Wrap
     // communication socket
     private Socket socket;
     // socket's output stream
-    private DataOutputStream output;
+    private OutputStream output;
     // system encryption options if any
     private final EncryptionOptions encryptionOptions;
     // allocate buffer to use for transfers only once
@@ -119,7 +121,7 @@ public class FileStreamTask extends Wrap
     private void stream() throws IOException
     {
         ByteBuffer HeaderBuffer = 
MessagingService.instance().constructStreamHeader(header, false, 
Gossiper.instance.getVersion(to));
-        // write header
+        // write header (this should not be compressed for compatibility with 
other messages)
         output.write(ByteBufferUtil.getArray(HeaderBuffer));
 
         if (header.file == null)
@@ -129,6 +131,9 @@ public class FileStreamTask extends Wrap
                                     ? 
CompressedRandomAccessReader.open(header.file.getFilename(), true)
                                     : RandomAccessReader.open(new 
File(header.file.getFilename()), CHUNK_SIZE, true);
 
+        // setting up data compression stream
+        output = new LZFOutputStream(output);
+
         try
         {
             // stream each of the required sections of the file
@@ -234,12 +239,12 @@ public class FileStreamTask extends Wrap
     protected void connect() throws IOException
     {
         socket.connect(new InetSocketAddress(to, 
DatabaseDescriptor.getStoragePort()));
-        output = new DataOutputStream(socket.getOutputStream());
+        output = socket.getOutputStream();
     }
 
     protected void close() throws IOException
     {
-        socket.close();
+        output.close();
     }
 
     public String toString()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1164689&r1=1164688&r2=1164689&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Fri Sep  2 20:23:42 2011
@@ -23,9 +23,6 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -41,6 +38,11 @@ import org.apache.cassandra.utils.ByteBu
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
+import com.ning.compress.lzf.LZFInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class IncomingStreamReader
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IncomingStreamReader.class);
@@ -79,7 +81,7 @@ public class IncomingStreamReader
             assert remoteFile.estimatedKeys > 0;
             SSTableReader reader = null;
             logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
-            DataInputStream dis = new DataInputStream(socket.getInputStream());
+            DataInputStream dis = new DataInputStream(new 
LZFInputStream(socket.getInputStream()));
             try
             {
                 reader = streamIn(dis, localFile, remoteFile);


Reply via email to