Author: jake Date: Fri Sep 30 17:18:50 2011 New Revision: 1177737 URL: http://svn.apache.org/viewvc?rev=1177737&view=rev Log: Thrift sockets are not properly buffered Patch my tjake; reviewed by jfarrell for CASSANDRA-3261
Added: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1177737&r1=1177736&r2=1177737&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Sep 30 17:18:50 2011 @@ -1,5 +1,6 @@ 1.0.1 * describe_ring should include datacenter/topology information (CASSANDRA-2882) + * Thrift sockets are not properly buffered (CASSANDRA-3261) 1.0.0-final Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=1177737&r1=1177736&r2=1177737&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java Fri Sep 30 17:18:50 2011 @@ -1,4 +1,5 @@ package org.apache.cassandra.thrift; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,8 +21,9 @@ package org.apache.cassandra.thrift; * */ - +import java.io.IOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; @@ -29,44 +31,79 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; /** - * Extends Thrift's TServerSocket to allow customization of various desirable - * TCP properties. + * Extends Thrift's TServerSocket to allow customization of various desirable TCP properties. */ -public class TCustomServerSocket extends TServerSocket +public class TCustomServerSocket extends TServerTransport { private static final Logger logger = LoggerFactory.getLogger(TCustomServerSocket.class); + /** + * Underlying serversocket object + */ + private ServerSocket serverSocket_ = null; + private final boolean keepAlive; private final Integer sendBufferSize; private final Integer recvBufferSize; /** * Allows fine-tuning of the server socket including keep-alive, reuse of addresses, send and receive buffer sizes. + * * @param bindAddr * @param keepAlive * @param sendBufferSize * @param recvBufferSize * @throws TTransportException */ - public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, Integer recvBufferSize) - throws TTransportException + public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, + Integer recvBufferSize) + throws TTransportException { - super(bindAddr); + try + { + // Make server socket + serverSocket_ = new ServerSocket(); + // Prevent 2MSL delay problem on server restarts + serverSocket_.setReuseAddress(true); + // Bind to listening port + serverSocket_.bind(bindAddr); + } + catch (IOException ioe) + { + serverSocket_ = null; + throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); + } + this.keepAlive = keepAlive; this.sendBufferSize = sendBufferSize; this.recvBufferSize = recvBufferSize; } @Override - protected TSocket acceptImpl() throws TTransportException + protected TCustomSocket acceptImpl() throws TTransportException { - TSocket tsocket = super.acceptImpl(); - Socket socket = tsocket.getSocket(); + + if (serverSocket_ == null) + throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); + + TCustomSocket tsocket = null; + Socket socket = null; + try + { + socket = serverSocket_.accept(); + tsocket = new TCustomSocket(socket); + tsocket.setTimeout(0); + } + catch (IOException iox) + { + throw new TTransportException(iox); + } try { @@ -103,4 +140,38 @@ public class TCustomServerSocket extends return tsocket; } + + @Override + public void listen() throws TTransportException + { + // Make sure not to block on accept + if (serverSocket_ != null) + { + try + { + serverSocket_.setSoTimeout(0); + } + catch (SocketException sx) + { + logger.error("Could not set socket timeout.", sx); + } + } + } + + @Override + public void close() + { + if (serverSocket_ != null) + { + try + { + serverSocket_.close(); + } + catch (IOException iox) + { + logger.warn("Could not close server socket.", iox); + } + serverSocket_ = null; + } + } } Added: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java?rev=1177737&view=auto ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java (added) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java Fri Sep 30 17:18:50 2011 @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.thrift; + + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Socket implementation of the TTransport interface. + * + * Adds socket buffering + * + */ +public class TCustomSocket extends TIOStreamTransport { + + private static final Logger LOGGER = LoggerFactory.getLogger(TCustomSocket.class.getName()); + + /** + * Wrapped Socket object + */ + private Socket socket_ = null; + + /** + * Remote host + */ + private String host_ = null; + + /** + * Remote port + */ + private int port_ = 0; + + /** + * Socket timeout + */ + private int timeout_ = 0; + + /** + * Constructor that takes an already created socket. + * + * @param socket Already created socket object + * @throws TTransportException if there is an error setting up the streams + */ + public TCustomSocket(Socket socket) throws TTransportException { + socket_ = socket; + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + } catch (SocketException sx) { + LOGGER.warn("Could not configure socket.", sx); + } + + if (isOpen()) { + try { + inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); + outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); + } catch (IOException iox) { + close(); + throw new TTransportException(TTransportException.NOT_OPEN, iox); + } + } + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param host Remote host + * @param port Remote port + */ + public TCustomSocket(String host, int port) { + this(host, port, 0); + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param host Remote host + * @param port Remote port + * @param timeout Socket timeout + */ + public TCustomSocket(String host, int port, int timeout) { + host_ = host; + port_ = port; + timeout_ = timeout; + initSocket(); + } + + /** + * Initializes the socket object + */ + private void initSocket() { + socket_ = new Socket(); + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + socket_.setSoTimeout(timeout_); + } catch (SocketException sx) { + LOGGER.error("Could not configure socket.", sx); + } + } + + /** + * Sets the socket timeout + * + * @param timeout Milliseconds timeout + */ + public void setTimeout(int timeout) { + timeout_ = timeout; + try { + socket_.setSoTimeout(timeout); + } catch (SocketException sx) { + LOGGER.warn("Could not set socket timeout.", sx); + } + } + + /** + * Returns a reference to the underlying socket. + */ + public Socket getSocket() { + if (socket_ == null) { + initSocket(); + } + return socket_; + } + + /** + * Checks whether the socket is connected. + */ + public boolean isOpen() { + if (socket_ == null) { + return false; + } + return socket_.isConnected(); + } + + /** + * Connects the socket, creating a new socket object if necessary. + */ + public void open() throws TTransportException { + if (isOpen()) { + throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); + } + + if (host_.length() == 0) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); + } + if (port_ <= 0) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port."); + } + + if (socket_ == null) { + initSocket(); + } + + try { + socket_.connect(new InetSocketAddress(host_, port_), timeout_); + inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); + outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); + } catch (IOException iox) { + close(); + throw new TTransportException(TTransportException.NOT_OPEN, iox); + } + } + + /** + * Closes the socket. + */ + public void close() { + // Close the underlying streams + super.close(); + + // Close the socket + if (socket_ != null) { + try { + socket_.close(); + } catch (IOException iox) { + LOGGER.warn("Could not close socket.", iox); + } + socket_ = null; + } + } + +} \ No newline at end of file