Author: gdusbabek Date: Wed Jan 19 18:10:35 2011 New Revision: 1060890 URL: http://svn.apache.org/viewvc?rev=1060890&view=rev Log: configurable internode encryption. patch by rnirmal, reviewed by gdusbabek. CASSANDRA-1567
Added: cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java cassandra/trunk/src/java/org/apache/cassandra/security/ cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java cassandra/trunk/src/java/org/apache/cassandra/security/streaming/ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/conf/cassandra.yaml cassandra/trunk/src/java/org/apache/cassandra/config/Config.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java cassandra/trunk/test/conf/cassandra.yaml Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Jan 19 18:10:35 2011 @@ -3,6 +3,7 @@ * adds support for columns that act as incr/decr counters (CASSANDRA-1072, 1937, 1944) * make NetworkTopologyStrategy the default (CASSANDRA-1960) + * configurable internode encryption (CASSANDRA-1567) 0.7.1-dev Modified: cassandra/trunk/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/conf/cassandra.yaml (original) +++ cassandra/trunk/conf/cassandra.yaml Wed Jan 19 18:10:35 2011 @@ -289,6 +289,23 @@ request_scheduler: org.apache.cassandra. # the index is at the cost of space. index_interval: 128 +# Enable or disable inter-node encryption +# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that +# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher +# suite for authentication, key exchange and encryption of the actual data transfers. +# NOTE: No custom encryption options are enabled at the moment +# The available internode options are : all, none +# +# The passwords used in these options must match the passwords used when generating +# the keystore and truststore. For instructions on generating these files, see: +# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore +encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra + # Keyspaces have ColumnFamilies. (Usually 1 KS per application.) # ColumnFamilies have Rows. (Dozens of CFs per KS.) # Rows contain Columns. (Many per CF.) Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Jan 19 18:10:35 2011 @@ -100,6 +100,8 @@ public class Config public RequestSchedulerId request_scheduler_id; public RequestSchedulerOptions request_scheduler_options; + public EncryptionOptions encryption_options; + public Integer index_interval = 128; public List<RawKeyspace> keyspaces; Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jan 19 18:10:35 2011 @@ -54,7 +54,7 @@ import org.yaml.snakeyaml.TypeDescriptio import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.error.YAMLException; -public class DatabaseDescriptor +public class DatabaseDescriptor { private static Logger logger = LoggerFactory.getLogger(DatabaseDescriptor.class); @@ -1139,8 +1139,14 @@ public class DatabaseDescriptor { return conf.dynamic_snitch_badness_threshold; } + public static void setDynamicBadnessThreshold(Double dynamicBadnessThreshold) { conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold; } + + public static EncryptionOptions getEncryptionOptions() + { + return conf.encryption_options; + } } Added: cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java?rev=1060890&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java Wed Jan 19 18:10:35 2011 @@ -0,0 +1,38 @@ +package org.apache.cassandra.config; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +public class EncryptionOptions +{ + public InternodeEncryption internode_encryption = InternodeEncryption.none; + public String keystore = "conf/.keystore"; + public String keystore_password = "cassandra"; + public String truststore = "conf/.truststore"; + public String truststore_password = "cassandra"; + public String[] cipherSuites = {"TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"}; + + public static enum InternodeEncryption + { + all, + none + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 19 18:10:35 2011 @@ -27,6 +27,9 @@ import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.security.streaming.SSLIncomingStreamReader; import org.apache.cassandra.streaming.IncomingStreamReader; import org.apache.cassandra.streaming.StreamHeader; @@ -77,8 +80,7 @@ public class IncomingTcpConnection exten int size = input.readInt(); byte[] headerBytes = new byte[size]; input.readFully(headerBytes); - StreamHeader streamHeader = StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))); - new IncomingStreamReader(streamHeader, socket.getChannel()).read(); + stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input); break; } else @@ -124,4 +126,12 @@ public class IncomingTcpConnection exten logger.debug("error closing socket", e); } } + + private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException + { + if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all) + new SSLIncomingStreamReader(streamHeader, socket, input).read(); + else + new IncomingStreamReader(streamHeader, socket).read(); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 19 18:10:35 2011 @@ -46,12 +46,15 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.ILatencyPublisher; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.net.io.SerializerType; import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.service.GCInspector; +import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.security.streaming.SSLFileStreamTask; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.FileStreamTask; import org.apache.cassandra.streaming.StreamHeader; @@ -175,16 +178,32 @@ public final class MessagingService impl * @param localEp InetAddress whose port to listen on. */ public void listen(InetAddress localEp) throws IOException - { - ServerSocketChannel serverChannel = ServerSocketChannel.open(); - final ServerSocket ss = serverChannel.socket(); - ss.setReuseAddress(true); - ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort())); - socketThread = new SocketThread(ss, "ACCEPT-" + localEp); + { + socketThread = new SocketThread(getServerSocket(localEp), "ACCEPT-" + localEp); socketThread.start(); listenGate.signalAll(); } + private ServerSocket getServerSocket(InetAddress localEp) throws IOException + { + final ServerSocket ss; + if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all) + { + ss = SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getStoragePort()); + // setReuseAddress happens in the factory. + logger_.info("Starting Encrypted Messaging Service on port {}", DatabaseDescriptor.getStoragePort()); + } + else + { + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + ss = serverChannel.socket(); + ss.setReuseAddress(true); + ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort())); + logger_.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort()); + } + return ss; + } + public void waitUntilListening() { try @@ -391,7 +410,10 @@ public final class MessagingService impl public void stream(StreamHeader header, InetAddress to) { /* Streaming asynchronously on streamExector_ threads. */ - streamExecutor_.execute(new FileStreamTask(header, to)); + if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all) + streamExecutor_.execute(new SSLFileStreamTask(header, to)); + else + streamExecutor_.execute(new FileStreamTask(header, to)); } public void register(ILatencySubscriber subcriber) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 19 18:10:35 2011 @@ -35,6 +35,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.utils.FBUtilities; public class OutboundTcpConnection extends Thread @@ -163,7 +165,14 @@ public class OutboundTcpConnection exten try { // zero means 'bind on any available port.' - socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); + if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all) + { + socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); + } + else { + socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); + } + socket.setKeepAlive(true); socket.setTcpNoDelay(true); output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096)); Added: cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java?rev=1060890&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java Wed Jan 19 18:10:35 2011 @@ -0,0 +1,102 @@ +package org.apache.cassandra.security; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.KeyStore; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.cassandra.config.EncryptionOptions; + +/** + * A Factory for providing and setting up Client and Server SSL wrapped + * Socket and ServerSocket + */ +public final class SSLFactory +{ + private static final String PROTOCOL = "TLS"; + private static final String ALGORITHM = "SunX509"; + private static final String STORE_TYPE = "JKS"; + + + public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException + { + SSLContext ctx = createSSLContext(options); + SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket(); + serverSocket.setReuseAddress(true); + serverSocket.setEnabledCipherSuites(options.cipherSuites); + serverSocket.bind(new InetSocketAddress(address, port), 100); + return serverSocket; + } + + /** Create a socket and connect */ + public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException + { + SSLContext ctx = createSSLContext(options); + SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort); + socket.setEnabledCipherSuites(options.cipherSuites); + return socket; + } + + /** Just create a socket */ + public static SSLSocket getSocket(EncryptionOptions options) throws IOException + { + SSLContext ctx = createSSLContext(options); + SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(); + socket.setEnabledCipherSuites(options.cipherSuites); + return socket; + } + + private static SSLContext createSSLContext(EncryptionOptions options) throws IOException { + SSLContext ctx; + try { + ctx = SSLContext.getInstance(PROTOCOL); + TrustManagerFactory tmf = null; + KeyManagerFactory kmf = null; + + tmf = TrustManagerFactory.getInstance(ALGORITHM); + KeyStore ts = KeyStore.getInstance(STORE_TYPE); + ts.load(new FileInputStream(options.truststore), options.truststore_password.toCharArray()); + tmf.init(ts); + + kmf = KeyManagerFactory.getInstance(ALGORITHM); + KeyStore ks = KeyStore.getInstance(STORE_TYPE); + ks.load(new FileInputStream(options.keystore), options.keystore_password.toCharArray()); + kmf.init(ks, options.keystore_password.toCharArray()); + + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + } catch (Exception e) { + throw new IOException("Error creating the initializing the SSL Context", e); + } + return ctx; + } +} Added: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java?rev=1060890&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java Wed Jan 19 18:10:35 2011 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.security.streaming; + +import java.io.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.streaming.FileStreamTask; +import org.apache.cassandra.streaming.StreamHeader; +import org.apache.cassandra.utils.FBUtilities; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.Pair; + +/** + * This class uses a DataOutputStream to write data as opposed to a FileChannel.transferTo + * used by FileStreamTask because the underlying SSLSocket doesn't support + * encrypting over NIO SocketChannel. + */ +public class SSLFileStreamTask extends FileStreamTask +{ + private DataOutputStream output; + private Socket socket; + + public SSLFileStreamTask(StreamHeader header, InetAddress to) + { + super(header, to); + } + + @Override + protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException + { + int toTransfer = (int)Math.min(CHUNK_SIZE, length - bytesTransferred); + fc.position(section.left + bytesTransferred); + ByteBuffer buf = ByteBuffer.allocate(toTransfer); + fc.read(buf); + buf.flip(); + output.write(buf.array(), 0, buf.limit()); + output.flush(); + return buf.limit(); + } + + @Override + protected void writeHeader(ByteBuffer buffer) throws IOException + { + output.write(buffer.array(), 0, buffer.limit()); + output.flush(); + } + + @Override + protected void bind() throws IOException + { + socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions()); + socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); + } + + @Override + protected void connect() throws IOException + { + socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); + output = new DataOutputStream(socket.getOutputStream()); + } + + @Override + protected void close() throws IOException + { + socket.close(); + } +} Added: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java?rev=1060890&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Wed Jan 19 18:10:35 2011 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.security.streaming; + +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.io.IOException; +import java.io.DataInputStream; + +import org.apache.cassandra.streaming.FileStreamTask; +import org.apache.cassandra.streaming.IncomingStreamReader; +import org.apache.cassandra.streaming.StreamHeader; + +/** + * This class uses a DataInputStream to read data as opposed to a FileChannel.transferFrom + * used by IncomingStreamReader because the underlying SSLServerSocket doesn't support + * encrypting over NIO SocketChannel. + */ +public class SSLIncomingStreamReader extends IncomingStreamReader +{ + private final DataInputStream input; + + public SSLIncomingStreamReader(StreamHeader header, Socket socket, DataInputStream input) throws IOException + { + super(header, socket); + this.input = input; + } + + @Override + protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException + { + int toRead = (int)Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead); + ByteBuffer buf = ByteBuffer.allocate(toRead); + input.readFully(buf.array()); + fc.write(buf); + bytesRead += buf.limit(); + remoteFile.progress += buf.limit(); + return bytesRead; + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Jan 19 18:10:35 2011 @@ -47,8 +47,9 @@ public class FileStreamTask extends Wrap // around 10 minutes at the default rpctimeout public static final int MAX_CONNECT_ATTEMPTS = 8; - private final StreamHeader header; - private final InetAddress to; + protected final StreamHeader header; + protected final InetAddress to; + private SocketChannel channel; public FileStreamTask(StreamHeader header, InetAddress to) { @@ -58,19 +59,18 @@ public class FileStreamTask extends Wrap public void runMayThrow() throws IOException { - SocketChannel channel = connect(); - - // successfully connected: stream. - // (at this point, if we fail, it is the receiver's job to re-request) try { - stream(channel); + connectAttempt(); + // successfully connected: stream. + // (at this point, if we fail, it is the receiver's job to re-request) + stream(); } finally { try { - channel.close(); + close(); } catch (IOException e) { @@ -82,11 +82,11 @@ public class FileStreamTask extends Wrap logger.debug("Done streaming " + header.file); } - private void stream(SocketChannel channel) throws IOException + private void stream() throws IOException { ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false); - channel.write(buffer); - assert buffer.remaining() == 0; + writeHeader(buffer); + if (header.file == null) return; @@ -101,8 +101,7 @@ public class FileStreamTask extends Wrap long bytesTransferred = 0; while (bytesTransferred < length) { - long toTransfer = Math.min(CHUNK_SIZE, length - bytesTransferred); - long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel); + long lastWrite = write(fc, section, length, bytesTransferred); bytesTransferred += lastWrite; header.file.progress += lastWrite; } @@ -116,24 +115,33 @@ public class FileStreamTask extends Wrap } } + protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException + { + long toTransfer = Math.min(CHUNK_SIZE, length - bytesTransferred); + return fc.transferTo(section.left + bytesTransferred, toTransfer, channel); + } + + protected void writeHeader(ByteBuffer buffer) throws IOException + { + channel.write(buffer); + assert buffer.remaining() == 0; + } + /** * Connects to the destination, with backoff for failed attempts. * TODO: all nodes on a cluster must currently use the same storage port * @throws IOException If all attempts fail. */ - private SocketChannel connect() throws IOException + private void connectAttempt() throws IOException { - SocketChannel channel = SocketChannel.open(); - // force local binding on correctly specified interface. - channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); + bind(); int attempts = 0; while (true) { try { - channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); - // success - return channel; + connect(); + break; } catch (IOException e) { @@ -153,4 +161,21 @@ public class FileStreamTask extends Wrap } } } + + protected void bind() throws IOException + { + channel = SocketChannel.open(); + // force local binding on correctly specified interface. + channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); + } + + protected void connect() throws IOException + { + channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); + } + + protected void close() throws IOException + { + channel.close(); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jan 19 18:10:35 2011 @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; @@ -35,16 +36,15 @@ public class IncomingStreamReader { private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class); - private final PendingFile localFile; - private final PendingFile remoteFile; + protected final PendingFile localFile; + protected final PendingFile remoteFile; private final SocketChannel socketChannel; - private final StreamInSession session; + protected final StreamInSession session; - public IncomingStreamReader(StreamHeader header, SocketChannel socketChannel) throws IOException + public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException { - this.socketChannel = socketChannel; - InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress(); - + this.socketChannel = socket.getChannel(); + InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress(); session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId); session.addFiles(header.pendingFiles); // set the current file we are streaming so progress shows up in jmx @@ -63,7 +63,7 @@ public class IncomingStreamReader session.closeIfFinished(); } - private void readFile() throws IOException + protected void readFile() throws IOException { if (logger.isDebugEnabled()) { @@ -82,10 +82,7 @@ public class IncomingStreamReader long bytesRead = 0; while (bytesRead < length) { - long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead); - long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead); - bytesRead += lastRead; - remoteFile.progress += lastRead; + bytesRead = readnwrite(length, bytesRead, offset, fc); } offset += length; } @@ -106,4 +103,13 @@ public class IncomingStreamReader session.finished(remoteFile, localFile); } + + protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException + { + long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead); + long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead); + bytesRead += lastRead; + remoteFile.progress += lastRead; + return bytesRead; + } } Modified: cassandra/trunk/test/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1060890&r1=1060889&r2=1060890&view=diff ============================================================================== --- cassandra/trunk/test/conf/cassandra.yaml (original) +++ cassandra/trunk/test/conf/cassandra.yaml Wed Jan 19 18:10:35 2011 @@ -26,6 +26,12 @@ endpoint_snitch: org.apache.cassandra.lo dynamic_snitch: true request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler request_scheduler_id: keyspace +encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra keyspaces: - name: Keyspace1 replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy