Author: xedin Date: Fri Sep 2 20:23:42 2011 New Revision: 1164689 URL: http://svn.apache.org/viewvc?rev=1164689&view=rev Log: Streams Compression patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3015
Added: cassandra/trunk/lib/compress-lzf-0.8.4.jar cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt (with props) Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164689&r1=1164688&r2=1164689&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Sep 2 20:23:42 2011 @@ -58,6 +58,7 @@ * make the repair of a range repair all replica (CASSANDRA-2610) * expose the ability to repair the first range (as returned by the partitioner) of a node (CASSANDRA-2606) + * Streams Compression (CASSANDRA-3015) 0.8.5 * fix NPE when encryption_options is unspecified (CASSANDRA-3007) Added: cassandra/trunk/lib/compress-lzf-0.8.4.jar URL: http://svn.apache.org/viewvc/cassandra/trunk/lib/compress-lzf-0.8.4.jar?rev=1164689&view=auto ============================================================================== Files cassandra/trunk/lib/compress-lzf-0.8.4.jar (added) and cassandra/trunk/lib/compress-lzf-0.8.4.jar Fri Sep 2 20:23:42 2011 differ Added: cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt?rev=1164689&view=auto ============================================================================== --- cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt (added) +++ cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt Fri Sep 2 20:23:42 2011 @@ -0,0 +1,11 @@ +Copyright 2009-2010 Ning, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you may not +use this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS,WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations under +the License. \ No newline at end of file Propchange: cassandra/trunk/lib/licenses/compress-lzf-0.8.4.txt ------------------------------------------------------------------------------ svn:executable = * Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1164689&r1=1164688&r2=1164689&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Fri Sep 2 20:23:42 2011 @@ -22,6 +22,7 @@ package org.apache.cassandra.net; import java.io.*; +import java.net.InetSocketAddress; import java.net.Socket; import org.apache.cassandra.gms.Gossiper; @@ -78,7 +79,7 @@ public class IncomingTcpConnection exten else { // streaming connections are per-session and have a fixed version. we can't do anything with a new-version stream connection, so drop it. - logger.error("Received untranslated stream from newer protcol version. Terminating connection!"); + logger.error("Received untranslated stream from newer protocol version. Terminating connection!"); } // We are done with this connection.... return; Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1164689&r1=1164688&r2=1164689&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Fri Sep 2 20:23:42 2011 @@ -18,9 +18,9 @@ package org.apache.cassandra.streaming; -import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -40,6 +40,8 @@ import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throttle; import org.apache.cassandra.utils.WrappedRunnable; +import com.ning.compress.lzf.LZFOutputStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +60,7 @@ public class FileStreamTask extends Wrap // communication socket private Socket socket; // socket's output stream - private DataOutputStream output; + private OutputStream output; // system encryption options if any private final EncryptionOptions encryptionOptions; // allocate buffer to use for transfers only once @@ -119,7 +121,7 @@ public class FileStreamTask extends Wrap private void stream() throws IOException { ByteBuffer HeaderBuffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to)); - // write header + // write header (this should not be compressed for compatibility with other messages) output.write(ByteBufferUtil.getArray(HeaderBuffer)); if (header.file == null) @@ -129,6 +131,9 @@ public class FileStreamTask extends Wrap ? CompressedRandomAccessReader.open(header.file.getFilename(), true) : RandomAccessReader.open(new File(header.file.getFilename()), CHUNK_SIZE, true); + // setting up data compression stream + output = new LZFOutputStream(output); + try { // stream each of the required sections of the file @@ -234,12 +239,12 @@ public class FileStreamTask extends Wrap protected void connect() throws IOException { socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); - output = new DataOutputStream(socket.getOutputStream()); + output = socket.getOutputStream(); } protected void close() throws IOException { - socket.close(); + output.close(); } public String toString() Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1164689&r1=1164688&r2=1164689&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Fri Sep 2 20:23:42 2011 @@ -23,9 +23,6 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collections; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; @@ -41,6 +38,11 @@ import org.apache.cassandra.utils.ByteBu import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; +import com.ning.compress.lzf.LZFInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class IncomingStreamReader { private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class); @@ -79,7 +81,7 @@ public class IncomingStreamReader assert remoteFile.estimatedKeys > 0; SSTableReader reader = null; logger.debug("Estimated keys {}", remoteFile.estimatedKeys); - DataInputStream dis = new DataInputStream(socket.getInputStream()); + DataInputStream dis = new DataInputStream(new LZFInputStream(socket.getInputStream())); try { reader = streamIn(dis, localFile, remoteFile);