Author: aidan Date: Tue Aug 18 16:16:10 2009 New Revision: 805477 URL: http://svn.apache.org/viewvc?rev=805477&view=rev Log: QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver implementation that uses MINA.
Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java - copied, changed from r805454, qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java Removed: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=805477&r1=805476&r2=805477&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original) +++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Aug 18 16:16:10 2009 @@ -31,6 +31,7 @@ import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.thread.QpidThreadExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import java.net.SocketAddress; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Receiver; + +/** + * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received + * decodes it and then process the result. + */ +public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> +{ + // Sets the network driver providing data for this ProtocolEngine + void setNetworkDriver (NetworkDriver driver); + + // Returns the remote address of the NetworkDriver + SocketAddress getRemoteAddress(); + + // Returns number of bytes written + long getWrittenBytes(); + + // Returns number of bytes read + long getReadBytes(); + + // Called by the NetworkDriver when the socket has been closed for reading + void closed(); + + // Called when the NetworkEngine has not written data for the specified period of time (will trigger a + // heartbeat) + void writerIdle(); + + // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) + void readerIdle(); + + /** + * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and + * passes the data onto the NetworkDriver for sending + */ + void writeFrame(AMQDataBlock frame); +} \ No newline at end of file Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +public interface ProtocolEngineFactory +{ + + // Returns a new instance of a ProtocolEngine + ProtocolEngine newProtocolEngine(); + +} \ No newline at end of file Copied: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (from r805454, qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java) URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?p2=qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java&p1=qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java&r1=805454&r2=805477&rev=805477&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Tue Aug 18 16:16:10 2009 @@ -1,4 +1,25 @@ -package org.apache.qpid.client.transport; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.thread; import org.apache.qpid.thread.Threading; Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.SocketAddress; + +import javax.net.ssl.SSLEngine; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; + +public interface NetworkDriver extends Sender<java.nio.ByteBuffer> +{ + // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to + // it using the SSLEngine if provided + void open(int port, InetAddress destination, ProtocolEngine engine, + NetworkDriverConfiguration config, SSLEngine sslEngine) + throws OpenException; + + // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which + // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided + void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; + + // Returns the remote address of underlying socket + SocketAddress getRemoteAddress(); + + /** + * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been + * read in seconds + */ + void setMaxReadIdle(int idleTime); + + /** + * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been + * written in seconds + */ + void setMaxWriteIdle(int idleTime); + +} \ No newline at end of file Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +/** + * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing + * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned + * from here if the underlying implementation supports them. + */ +public interface NetworkDriverConfiguration +{ + // Taken from Socket + boolean getKeepAlive(); + boolean getOOBInline(); + boolean getReuseAddress(); + Integer getSoLinger(); // null means off + int getSoTimeout(); + boolean getTcpNoDelay(); + int getTrafficClass(); + + // The amount of memory in bytes to allocate to the incoming buffer + int getReceiveBufferSize(); + + // The amount of memory in bytes to allocate to the outgoing buffer + int getSendBufferSize(); +} Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport; + +public class OpenException extends Exception +{ + + public OpenException(String string, Throwable lastException) + { + super(string, lastException); + } + +} Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,379 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLEngine; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.SessionUtil; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.OpenException; + +public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver +{ + + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + + ProtocolEngine _protocolEngine; + private boolean _useNIO = false; + private int _processors = 4; + private boolean _executorPool = false; + private SSLContextFactory _sslFactory = null; + private SocketConnector _socketConnector; + private IoAcceptor _acceptor; + private IoSession _ioSession; + private ProtocolEngineFactory _factory; + private boolean _protectIO; + private NetworkDriverConfiguration _config; + private Throwable _lastException; + private boolean _acceptingConnections = false; + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + } + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO, + ProtocolEngine protocolEngine, IoSession session) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + _protocolEngine = protocolEngine; + _ioSession = session; + } + + public MINANetworkDriver() + { + + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + _factory = factory; + _config = config; + + if (_useNIO) + { + _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, + new NewThreadExecutor()); + } + else + { + _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); + } + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + + if (config != null) + { + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); + } + + // if we do not use the executor pool threading model we get the default + // leader follower + // implementation provided by MINA + if (_executorPool) + { + sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + + if (addresses != null && addresses.length > 0) + { + for (InetAddress addr : addresses) + { + try + { + _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to {0}:{2}", addr, port)); + } + } + } + else + { + try + { + _acceptor.bind(new InetSocketAddress(port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to *:{1}", port)); + } + } + _acceptingConnections = true; + } + + public SocketAddress getRemoteAddress() + { + return _ioSession.getRemoteAddress(); + } + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + SSLEngine sslEngine) throws OpenException + { + if (_useNIO) + { + _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); + } + else + { + _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking + // connector + } + + org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + // the MINA default is currently to use the pooled allocator although this may change in future + // once more testing of the performance of the simple allocator has been done + if (!Boolean.getBoolean("amqj.enablePooledAllocator")) + { + org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + + SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); + + // if we do not use our own thread model we get the MINA default which is to use + // its own leader-follower model + boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); + if (readWriteThreading) + { + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); + scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use + // one SocketConnector per connection at the moment anyway). This allows + // short-running + // clients (like unit tests) to complete quickly. + _socketConnector.setWorkerTimeout(0); + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); + future.join(); + if (!future.isConnected()) + { + throw new OpenException("Could not open connection", _lastException); + } + _ioSession = future.getSession(); + ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession); + ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession); + _ioSession.setAttachment(engine); + engine.setNetworkDriver(this); + _protocolEngine = engine; + } + + public void setMaxReadIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); + } + + public void setMaxWriteIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); + } + + public void close() + { + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + if (_ioSession != null) + { + _ioSession.close(); + } + } + + public void flush() + { + // MINA doesn't support flush + } + + public void send(ByteBuffer msg) + { + WriteFuture future = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg)); + future.join(); + } + + public void setIdleTimeout(long l) + { + // MINA doesn't support setting SO_TIMEOUT + } + + public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception + { + if (_protocolEngine != null) + { + _protocolEngine.exception(throwable); + } + _lastException = throwable; + } + + /** + * Invoked when a message is received on a particular protocol session. Note + * that a protocol session is directly tied to a particular physical + * connection. + * + * @param protocolSession + * the protocol session that received the message + * @param message + * the message itself (i.e. a decoded frame) + * + * @throws Exception + * if the message cannot be processed + */ + public void messageReceived(IoSession protocolSession, Object message) throws Exception + { + if (message instanceof org.apache.mina.common.ByteBuffer) + { + ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); + } + else + { + throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); + } + } + + public void sessionClosed(IoSession protocolSession) throws Exception + { + ((ProtocolEngine) protocolSession.getAttachment()).closed(); + } + + public void sessionCreated(IoSession protocolSession) throws Exception + { + if (_acceptingConnections) + { + // Configure the session with SSL if necessary + SessionUtil.initialize(protocolSession); + if (_executorPool) + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + else + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + + // Do we want to have read/write buffer limits? + if (_protectIO) + { + //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + } + + // Set up the protocol engine + ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); + protocolEngine.setNetworkDriver(newDriver); + protocolSession.setAttachment(protocolEngine); + } + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + + private ProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + +} Added: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java?rev=805477&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java Tue Aug 18 16:16:10 2009 @@ -0,0 +1,473 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.OpenException; + +public class MINANetworkDriverTest extends TestCase +{ + + private static final String TEST_DATA = "YHALOTHAR"; + private static final int TEST_PORT = 2323; + private NetworkDriver _server; + private NetworkDriver _client; + private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read + private Exception _thrownEx; + + @Override + public void setUp() + { + _server = new MINANetworkDriver(); + _client = new MINANetworkDriver(); + _thrownEx = null; + _countingEngine = new CountingProtocolEngine(); + } + + @Override + public void tearDown() + { + if (_server != null) + { + _server.close(); + } + + if (_client != null) + { + _client.close(); + } + } + + /** + * Tests that a socket can't be opened if a driver hasn't been bound + * to the port and can be opened if a driver has been bound. + * @throws BindException + * @throws UnknownHostException + * @throws OpenException + */ + public void testBindOpen() throws BindException, UnknownHostException, OpenException + { + try + { + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + } + catch (OpenException e) + { + _thrownEx = e; + } + + assertNotNull("Open should have failed since no engine bound", _thrownEx); + + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + } + + /** + * Tests that a socket can't be opened after a bound NetworkDriver has been closed + * @throws BindException + * @throws UnknownHostException + * @throws OpenException + */ + public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException + { + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.close(); + _server.close(); + + try + { + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + } + catch (OpenException e) + { + _thrownEx = e; + } + assertNotNull("Open should have failed", _thrownEx); + } + + /** + * Checks that the right exception is thrown when binding a NetworkDriver to an already + * existing socket. + */ + public void testBindPortInUse() + { + try + { + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + } + catch (BindException e) + { + fail("First bind should not fail"); + } + + try + { + _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + } + catch (BindException e) + { + _thrownEx = e; + } + assertNotNull("Second bind should throw BindException", _thrownEx); + } + + /** + * tests that bytes sent on a network driver are received at the other end + * + * @throws UnknownHostException + * @throws OpenException + * @throws InterruptedException + * @throws BindException + */ + public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException + { + // Open a connection from a counting engine to an echo engine + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + + // Tell the counting engine how much data we're sending + _countingEngine.setNewLatch(TEST_DATA.getBytes().length); + + // Send the data and wait for up to 2 seconds to get it back + _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _countingEngine.getLatch().await(2, TimeUnit.SECONDS); + + // Check what we got + assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes()); + } + + /** + * Opens a connection with a low read idle and check that it gets triggered + * @throws BindException + * @throws OpenException + * @throws UnknownHostException + * + */ + public void testSetReadIdle() throws BindException, UnknownHostException, OpenException + { + // Open a connection from a counting engine to an echo engine + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); + _client.setMaxReadIdle(1); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + // Eat it + } + assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); + } + + /** + * Opens a connection with a low write idle and check that it gets triggered + * @throws BindException + * @throws OpenException + * @throws UnknownHostException + * + */ + public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException + { + // Open a connection from a counting engine to an echo engine + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); + _client.setMaxWriteIdle(1); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + // Eat it + } + assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); + } + + + /** + * Creates and then closes a connection from client to server and checks that the server + * has its closed() method called. Then creates a new client and closes the server to check + * that the client has its closed() method called. + * @throws BindException + * @throws UnknownHostException + * @throws OpenException + */ + public void testClosed() throws BindException, UnknownHostException, OpenException + { + // Open a connection from a counting engine to an echo engine + EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); + _server.bind(TEST_PORT, null, factory, null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + EchoProtocolEngine serverEngine = null; + while (serverEngine == null) + { + serverEngine = factory.getEngine(); + if (serverEngine == null) + { + try + { + Thread.sleep(10); + } + catch (InterruptedException e) + { + } + } + } + assertFalse("Server should not have been closed", serverEngine.getClosed()); + serverEngine.setNewLatch(1); + _client.close(); + try + { + serverEngine.getLatch().await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + } + assertTrue("Server should have been closed", serverEngine.getClosed()); + + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _countingEngine.setClosed(false); + assertFalse("Client should not have been closed", _countingEngine.getClosed()); + _countingEngine.setNewLatch(1); + _server.close(); + try + { + _countingEngine.getLatch().await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + } + assertTrue("Client should have been closed", _countingEngine.getClosed()); + } + + /** + * Create a connection and instruct the client to throw an exception when it gets some data + * and that the latch gets counted down. + * @throws BindException + * @throws UnknownHostException + * @throws OpenException + * @throws InterruptedException + */ + public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException + { + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + + + assertEquals("Exception should not have been thrown", 1, + _countingEngine.getExceptionLatch().getCount()); + _countingEngine.setErrorOnNextRead(true); + _countingEngine.setNewLatch(TEST_DATA.getBytes().length); + _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); + assertEquals("Exception should not been thrown", 0, + _countingEngine.getExceptionLatch().getCount()); + } + + /** + * Opens a connection and checks that the remote address is the one that was asked for + * @throws BindException + * @throws UnknownHostException + * @throws OpenException + */ + public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException + { + _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT), + _client.getRemoteAddress()); + } + + private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory + { + EchoProtocolEngine _engine = null; + + public ProtocolEngine newProtocolEngine() + { + if (_engine == null) + { + _engine = new EchoProtocolEngine(); + } + return getEngine(); + } + + public EchoProtocolEngine getEngine() + { + return _engine; + } + } + + public class CountingProtocolEngine implements ProtocolEngine + { + + protected NetworkDriver _driver; + public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>(); + private int _readBytes; + private CountDownLatch _latch = new CountDownLatch(0); + private boolean _readerHasBeenIdle; + private boolean _writerHasBeenIdle; + private boolean _closed = false; + private boolean _nextReadErrors = false; + private CountDownLatch _exceptionLatch = new CountDownLatch(1); + + public void closed() + { + setClosed(true); + _latch.countDown(); + } + + public void setErrorOnNextRead(boolean b) + { + _nextReadErrors = b; + } + + public void setNewLatch(int length) + { + _latch = new CountDownLatch(length); + } + + public long getReadBytes() + { + return _readBytes; + } + + public SocketAddress getRemoteAddress() + { + if (_driver != null) + { + return _driver.getRemoteAddress(); + } + else + { + return null; + } + } + + public long getWrittenBytes() + { + return 0; + } + + public void readerIdle() + { + _readerHasBeenIdle = true; + } + + public void setNetworkDriver(NetworkDriver driver) + { + _driver = driver; + } + + public void writeFrame(AMQDataBlock frame) + { + + } + + public void writerIdle() + { + _writerHasBeenIdle = true; + } + + public void exception(Throwable t) + { + _exceptionLatch.countDown(); + } + + public CountDownLatch getExceptionLatch() + { + return _exceptionLatch; + } + + public void received(ByteBuffer msg) + { + // increment read bytes and count down the latch for that many + int bytes = msg.remaining(); + _readBytes += bytes; + for (int i = 0; i < bytes; i++) + { + _latch.countDown(); + } + + // Throw an error if we've been asked too, but we can still count + if (_nextReadErrors) + { + throw new RuntimeException("Was asked to error"); + } + } + + public CountDownLatch getLatch() + { + return _latch; + } + + public boolean getWriterHasBeenIdle() + { + return _writerHasBeenIdle; + } + + public boolean getReaderHasBeenIdle() + { + return _readerHasBeenIdle; + } + + public void setClosed(boolean _closed) + { + this._closed = _closed; + } + + public boolean getClosed() + { + return _closed; + } + } + + private class EchoProtocolEngine extends CountingProtocolEngine + { + + public void received(ByteBuffer msg) + { + super.received(msg); + msg.rewind(); + _driver.send(msg); + } + } +} \ No newline at end of file --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org