Author: rgodfrey Date: Mon Nov 16 09:50:41 2015 New Revision: 1714530 URL: http://svn.apache.org/viewvc?rev=1714530&view=rev Log: QPID-6843 : Implement heartbeating for AMQP 1.0
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Nov 16 09:50:41 2015 @@ -60,9 +60,6 @@ public class NonBlockingConnection imple private final AtomicLong _usedOutboundMessageSpace = new AtomicLong(); private final long _outboundMessageBufferLimit; - private volatile int _maxReadIdle; - private volatile int _maxWriteIdle; - private volatile boolean _fullyWritten = true; private boolean _partialRead = false; @@ -73,6 +70,8 @@ public class NonBlockingConnection imple private final String _threadName; private volatile SelectorThread.SelectionTask _selectionTask; private Iterator<Runnable> _pendingIterator; + private final AtomicLong _maxWriteIdleMillis = new AtomicLong(); + private final AtomicLong _maxReadIdleMillis = new AtomicLong(); public NonBlockingConnection(SocketChannel socketChannel, ProtocolEngine protocolEngine, @@ -169,15 +168,15 @@ public class NonBlockingConnection imple } @Override - public void setMaxWriteIdle(int sec) + public void setMaxWriteIdleMillis(final long millis) { - _maxWriteIdle = sec; + _maxWriteIdleMillis.set(millis); } @Override - public void setMaxReadIdle(int sec) + public void setMaxReadIdleMillis(final long millis) { - _maxReadIdle = sec; + _maxReadIdleMillis.set(millis); } @Override @@ -193,15 +192,15 @@ public class NonBlockingConnection imple } @Override - public int getMaxReadIdle() + public long getMaxReadIdleMillis() { - return _maxReadIdle; + return _maxReadIdleMillis.get(); } @Override - public int getMaxWriteIdle() + public long getMaxWriteIdleMillis() { - return _maxWriteIdle; + return _maxWriteIdleMillis.get(); } @Override Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Nov 16 09:50:41 2015 @@ -173,7 +173,7 @@ public class NonBlockingNetworkTransport _port); engine.setNetworkConnection(connection); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT); idleTimeoutTicker.setConnection(connection); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Nov 16 09:50:41 2015 @@ -283,14 +283,14 @@ public class ServerConnectionDelegate ex heartbeat = 0; } - networkConnection.setMaxReadIdle(2 * heartbeat); - networkConnection.setMaxWriteIdle(heartbeat); + networkConnection.setMaxReadIdleMillis(2000L * heartbeat); + networkConnection.setMaxWriteIdleMillis(1000L * heartbeat); } else { - networkConnection.setMaxReadIdle(0); - networkConnection.setMaxWriteIdle(0); + networkConnection.setMaxReadIdleMillis(0); + networkConnection.setMaxWriteIdleMillis(0); } setConnectionTuneOkChannelMax(sconn, okChannelMax); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Mon Nov 16 09:50:41 2015 @@ -550,13 +550,13 @@ public class AMQPConnection_0_8 { if (delay > 0) { - _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay); + _network.setMaxWriteIdleMillis(1000L * delay); + _network.setMaxReadIdleMillis(1000L * BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay); } else { - _network.setMaxWriteIdle(0); - _network.setMaxReadIdle(0); + _network.setMaxWriteIdleMillis(0); + _network.setMaxReadIdleMillis(0); } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Mon Nov 16 09:50:41 2015 @@ -134,13 +134,12 @@ public class FrameHandler implements Pro state = State.ERROR; break; } - - else if (size > _connection.getDesiredMaxFrameSize().intValue()) + else if(size > _connection.getMaxFrameSize()) { frameParsingError = createFramingError( "specified frame size %d larger than maximum frame header size %d", size, - _connection.getDesiredMaxFrameSize().intValue()); + _connection.getMaxFrameSize()); state = State.ERROR; break; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Nov 16 09:50:41 2015 @@ -136,7 +136,6 @@ public class ConnectionEndpoint implemen private boolean _saslComplete; - private UnsignedInteger _desiredMaxFrameSize = UnsignedInteger.valueOf(DEFAULT_MAX_FRAME); private Runnable _onSaslCompleteTask; private SaslServerProvider _saslServerProvider; @@ -153,6 +152,7 @@ public class ConnectionEndpoint implemen private Principal _externalPrincipal; private List<Runnable> _postLockActions = new ArrayList<>(); private Map _remoteProperties; + private long _desiredIdleTimeout; public ConnectionEndpoint(Container container, SaslServerProvider cbs) { @@ -294,8 +294,9 @@ public class ConnectionEndpoint implemen } open.setChannelMax(UnsignedShort.valueOf((short) channelMax)); open.setContainerId(_container.getId()); - open.setMaxFrameSize(getDesiredMaxFrameSize()); + open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); open.setHostname(getRemoteHostname()); + open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout)); if (_properties != null) { open.setProperties(_properties); @@ -304,18 +305,6 @@ public class ConnectionEndpoint implemen send(CONNECTION_CONTROL_CHANNEL, open); } - public UnsignedInteger getDesiredMaxFrameSize() - { - return _desiredMaxFrameSize; - } - - - public void setDesiredMaxFrameSize(UnsignedInteger size) - { - _desiredMaxFrameSize = size; - } - - private void closeSender() { setClosedForOutput(true); @@ -363,12 +352,8 @@ public class ConnectionEndpoint implemen _receivingSessions = new SessionEndpoint[_channelMax + 1]; _sendingSessions = new SessionEndpoint[_channelMax + 1]; } - UnsignedInteger remoteDesiredMaxFrameSize = - open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize(); - _maxFrameSize = (remoteDesiredMaxFrameSize.compareTo(_desiredMaxFrameSize) < 0 - ? remoteDesiredMaxFrameSize - : _desiredMaxFrameSize).intValue(); + _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue(); _remoteContainerId = open.getContainerId(); _localHostname = open.getHostname(); @@ -390,11 +375,7 @@ public class ConnectionEndpoint implemen // TODO bad stuff (connection already open) } - /*if(_state == ConnectionState.AWAITING_OPEN) - { - _state = ConnectionState.OPEN; - } -*/ + notifyAll(); } @@ -776,6 +757,16 @@ public class ConnectionEndpoint implemen _externalPrincipal = externalPrincipal; } + public void setDesiredIdleTimeout(final long desiredIdleTimeout) + { + _desiredIdleTimeout = desiredIdleTimeout; + } + + public long getDesiredIdleTimeout() + { + return _desiredIdleTimeout; + } + public static interface FrameReceiptLogger { boolean isEnabled(); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Mon Nov 16 09:50:41 2015 @@ -45,6 +45,7 @@ import org.apache.qpid.amqp_1_0.framing. import org.apache.qpid.amqp_1_0.framing.FrameHandler; import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler; +import org.apache.qpid.amqp_1_0.framing.TransportFrame; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; @@ -52,11 +53,14 @@ import org.apache.qpid.amqp_1_0.transpor import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; +import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.ConnectionClosingTicker; @@ -157,6 +161,7 @@ public class AMQPConnection_1_0 extends _connection.setAmqpConnection(this); _endpoint = _connection.getConnectionEndpoint(); _endpoint.setConnectionEventListener(_connection); + _endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay()); _endpoint.setFrameOutputHandler(this); final List<String> mechanisms = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms(); ByteBuffer headerResponse = useSASL ? initiateSasl() : initiateNonSasl(mechanisms); @@ -265,14 +270,25 @@ public class AMQPConnection_1_0 extends return headerResponse; } + @Override public void writerIdle() { - //Todo + send(TransportFrame.createAMQFrame((short)0,null)); } + @Override public void readerIdle() { - //Todo + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + @Override + public Object run() + { + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + getNetwork().close(); + return null; + } + }, getAccessControllerContext()); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Mon Nov 16 09:50:41 2015 @@ -46,6 +46,7 @@ import org.apache.qpid.amqp_1_0.transpor import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; +import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; @@ -61,6 +62,7 @@ import org.apache.qpid.server.virtualhos public class Connection_1_0 implements ConnectionEventListener { + private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L; private final AmqpPort<?> _port; private final SubjectCreator _subjectCreator; private AMQPConnection_1_0 _amqpConnection; @@ -139,39 +141,56 @@ public class Connection_1_0 implements C } _amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId()); } - // TODO implement AMQP 1.0 heartbeating - _amqpConnection.getNetwork().setMaxReadIdle(0); - _amqpConnection.getNetwork().setMaxWriteIdle(0); - - _vhost = ((AmqpPort)_port).getVirtualHost(host); - if(_vhost == null) + _amqpConnection.getNetwork().setMaxReadIdleMillis(_connectionEndpoint.getDesiredIdleTimeout()); + long idleTimeout = _connectionEndpoint.getIdleTimeout(); + if(idleTimeout != 0L && idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT) { - final Error err = new Error(); - err.setCondition(AmqpError.NOT_FOUND); - err.setDescription("Unknown hostname in connection open: '" + host + "'"); - _connectionEndpoint.close(err); + _connectionEndpoint.close(new Error(ConnectionError.CONNECTION_FORCED, + "Requested idle timeout of " + + idleTimeout + + " is too low. The minimum supported timeout is" + + MINIMUM_SUPPORTED_IDLE_TIMEOUT)); + _amqpConnection.close(); _closedOnOpen = true; } else { - final Principal user = _connectionEndpoint.getUser(); - if(user != null) - { - setUserPrincipal(user); - } - _amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal()); - _amqpConnection.updateAccessControllerContext(); - if(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) == null) + _amqpConnection.getNetwork().setMaxWriteIdleMillis(idleTimeout / 2L); + + _vhost = ((AmqpPort) _port).getVirtualHost(host); + if (_vhost == null) { final Error err = new Error(); - err.setCondition(AmqpError.NOT_ALLOWED); - err.setDescription("Connection has not been authenticated"); + err.setCondition(AmqpError.NOT_FOUND); + err.setDescription("Unknown hostname in connection open: '" + host + "'"); _connectionEndpoint.close(err); + _amqpConnection.close(); + _closedOnOpen = true; } else { - _amqpConnection.virtualHostAssociated(); + final Principal user = _connectionEndpoint.getUser(); + if (user != null) + { + setUserPrincipal(user); + } + _amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal()); + _amqpConnection.updateAccessControllerContext(); + if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) + == null) + { + final Error err = new Error(); + err.setCondition(AmqpError.NOT_ALLOWED); + err.setDescription("Connection has not been authenticated"); + _connectionEndpoint.close(err); + _amqpConnection.close(); + _closedOnOpen = true; + } + else + { + _amqpConnection.virtualHostAssociated(); + } } } } Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original) +++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Mon Nov 16 09:50:41 2015 @@ -318,8 +318,8 @@ class WebSocketProvider implements Accep private final AtomicLong _usedOutboundMessageSpace = new AtomicLong(); private Certificate _certificate; - private int _maxWriteIdle; - private int _maxReadIdle; + private long _maxWriteIdleMillis; + private long _maxReadIdleMillis; public ConnectionWrapper(final WebSocket.Connection connection, final SocketAddress localAddress, @@ -378,15 +378,15 @@ class WebSocketProvider implements Accep } @Override - public void setMaxWriteIdle(final int sec) + public void setMaxWriteIdleMillis(final long millis) { - _maxWriteIdle = sec; + _maxWriteIdleMillis = millis; } @Override - public void setMaxReadIdle(final int sec) + public void setMaxReadIdleMillis(final long millis) { - _maxReadIdle = sec; + _maxReadIdleMillis = millis; } @Override @@ -402,15 +402,15 @@ class WebSocketProvider implements Accep } @Override - public int getMaxReadIdle() + public long getMaxReadIdleMillis() { - return _maxReadIdle; + return _maxReadIdleMillis; } @Override - public int getMaxWriteIdle() + public long getMaxWriteIdleMillis() { - return _maxWriteIdle; + return _maxWriteIdleMillis; } @Override Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Nov 16 09:50:41 2015 @@ -154,9 +154,9 @@ public class AMQProtocolSession implemen if (delay > 0) { NetworkConnection network = getProtocolHandler().getNetworkConnection(); - network.setMaxWriteIdle(delay); + network.setMaxWriteIdleMillis(1000L*delay); int readerIdle = (int)(delay * timeoutFactor); - network.setMaxReadIdle(readerIdle); + network.setMaxReadIdleMillis(1000L * readerIdle); } } Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java (original) +++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java Mon Nov 16 09:50:41 2015 @@ -58,11 +58,6 @@ public class TestNetworkConnection imple return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); } - public void setMaxReadIdle(int idleTime) - { - - } - @Override public Principal getPeerPrincipal() { @@ -76,18 +71,25 @@ public class TestNetworkConnection imple } @Override - public int getMaxReadIdle() + public long getMaxReadIdleMillis() + { + return 0L; + } + + @Override + public long getMaxWriteIdleMillis() { - return 0; + return 0L; } @Override - public int getMaxWriteIdle() + public void setMaxWriteIdleMillis(final long millis) { - return 0; + } - public void setMaxWriteIdle(int idleTime) + @Override + public void setMaxReadIdleMillis(final long millis) { } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Mon Nov 16 09:50:41 2015 @@ -153,8 +153,8 @@ public class ClientDelegate extends Conn maxFrameSize, actualHeartbeatInterval); - conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); - conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); + conn.getNetworkConnection().setMaxReadIdleMillis((long) (1000L * (actualHeartbeatInterval * heartbeatTimeoutFactor))); + conn.getNetworkConnection().setMaxWriteIdleMillis(1000L *actualHeartbeatInterval); conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize); int channelMax = tune.getChannelMax(); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Mon Nov 16 09:50:41 2015 @@ -44,15 +44,16 @@ public interface NetworkConnection */ SocketAddress getLocalAddress(); - void setMaxWriteIdle(int sec); + void setMaxWriteIdleMillis(long millis); + void setMaxReadIdleMillis(long millis); - void setMaxReadIdle(int sec); Principal getPeerPrincipal(); Certificate getPeerCertificate(); - int getMaxReadIdle(); + long getMaxReadIdleMillis(); + + long getMaxWriteIdleMillis(); - int getMaxWriteIdle(); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java Mon Nov 16 09:50:41 2015 @@ -41,14 +41,14 @@ public class IdleTimeoutTicker implement public int getTimeToNextTick(long currentTime) { long nextTime = -1; - final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + final long maxReadIdle = _connection.getMaxReadIdleMillis(); if(maxReadIdle > 0) { nextTime = _transport.getLastReadTime() + maxReadIdle; } - long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + long maxWriteIdle = _connection.getMaxWriteIdleMillis(); if(maxWriteIdle > 0) { @@ -65,13 +65,13 @@ public class IdleTimeoutTicker implement public int tick(long currentTime) { // writer Idle - long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + long maxWriteIdle = _connection.getMaxWriteIdleMillis(); if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) { _transport.writerIdle(); } // reader Idle - final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + final long maxReadIdle = _connection.getMaxReadIdleMillis(); if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) { Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Mon Nov 16 09:50:41 2015 @@ -43,12 +43,12 @@ public class IoNetworkConnection impleme private final long _timeout; private final IoSender _ioSender; private final IoReceiver _ioReceiver; - private int _maxReadIdle; - private int _maxWriteIdle; private Principal _principal; private boolean _principalChecked; private final Object _lock = new Object(); private Certificate _certificate; + private long _maxWriteIdleMillis; + private long _maxReadIdleMillis; public IoNetworkConnection(Socket socket, ExceptionHandlingByteBufferReceiver delegate, int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) @@ -98,14 +98,16 @@ public class IoNetworkConnection impleme return _socket.getLocalSocketAddress(); } - public void setMaxWriteIdle(int sec) + @Override + public void setMaxWriteIdleMillis(final long millis) { - _maxWriteIdle = sec; + _maxWriteIdleMillis = millis; } - public void setMaxReadIdle(int sec) + @Override + public void setMaxReadIdleMillis(final long millis) { - _maxReadIdle = sec; + _maxReadIdleMillis = millis; } @Override @@ -154,14 +156,14 @@ public class IoNetworkConnection impleme } @Override - public int getMaxReadIdle() + public long getMaxReadIdleMillis() { - return _maxReadIdle; + return _maxReadIdleMillis; } @Override - public int getMaxWriteIdle() + public long getMaxWriteIdleMillis() { - return _maxWriteIdle; + return _maxWriteIdleMillis; } } Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java Mon Nov 16 09:50:41 2015 @@ -37,10 +37,10 @@ public class IdleTimeoutTickerTest exten private long _lastReadTime; private long _lastWriteTime; private long _currentTime; - private int _maxWriteIdle; - private int _maxReadIdle; private boolean _readerIdle; private boolean _writerIdle; + private long _maxReadIdleMillis; + private long _maxWriteIdleMillis; @Override public void setUp() throws Exception @@ -52,14 +52,14 @@ public class IdleTimeoutTickerTest exten _writerIdle = false; _lastReadTime = 0l; _lastWriteTime = 0l; - _maxReadIdle = 0; - _maxWriteIdle = 0; + _maxReadIdleMillis = 0; + _maxWriteIdleMillis = 0; } public void testNoIdle() throws Exception { - _maxReadIdle = 4; - _maxWriteIdle = 2; + _maxReadIdleMillis = 4000; + _maxWriteIdleMillis = 2000; _lastReadTime = 0; _lastWriteTime = 1500; _currentTime = 3000; @@ -82,13 +82,13 @@ public class IdleTimeoutTickerTest exten assertFalse("Incorrectly caused reader idle", _readerIdle); assertFalse("Incorrectly caused writer idle", _writerIdle); - _maxReadIdle = 0; + _maxReadIdleMillis = 0; nextTime = _ticker.tick(_currentTime); assertEquals("Incorrect next tick calculation", 1700l, nextTime); assertFalse("Incorrectly caused reader idle", _readerIdle); assertFalse("Incorrectly caused writer idle", _writerIdle); - _maxWriteIdle = 0; + _maxWriteIdleMillis = 0; nextTime = _ticker.tick(_currentTime); assertEquals("Incorrect next tick calculation", DEFAULT_TIMEOUT, nextTime); assertFalse("Incorrectly caused reader idle", _readerIdle); @@ -98,8 +98,8 @@ public class IdleTimeoutTickerTest exten public void testReaderIdle() throws Exception { - _maxReadIdle = 4; - _maxWriteIdle = 0; + _maxReadIdleMillis = 4000; + _maxWriteIdleMillis = 0; _lastReadTime = 0; _lastWriteTime = 2500; _currentTime = 4000; @@ -113,7 +113,7 @@ public class IdleTimeoutTickerTest exten _readerIdle = false; // last write = 2.5s, max write idle = 2s, should check in 0.5s - _maxWriteIdle = 2; + _maxWriteIdleMillis = 2000; nextTime = _ticker.tick(_currentTime); assertTrue(_readerIdle); assertFalse(_writerIdle); @@ -131,8 +131,8 @@ public class IdleTimeoutTickerTest exten public void testWriterIdle() throws Exception { - _maxReadIdle = 0; - _maxWriteIdle = 2; + _maxReadIdleMillis = 0; + _maxWriteIdleMillis = 2000; _lastReadTime = 0; _lastWriteTime = 1500; _currentTime = 4000; @@ -146,7 +146,7 @@ public class IdleTimeoutTickerTest exten _writerIdle = false; _lastWriteTime = 1500; - _maxReadIdle = 5; + _maxReadIdleMillis = 5000; nextTime = _ticker.tick(_currentTime); @@ -219,15 +219,15 @@ public class IdleTimeoutTickerTest exten } @Override - public void setMaxWriteIdle(int sec) + public void setMaxWriteIdleMillis(final long millis) { - _maxWriteIdle = sec; + _maxWriteIdleMillis = millis; } @Override - public void setMaxReadIdle(int sec) + public void setMaxReadIdleMillis(final long millis) { - _maxReadIdle = sec; + _maxReadIdleMillis = millis; } @Override @@ -243,14 +243,14 @@ public class IdleTimeoutTickerTest exten } @Override - public int getMaxReadIdle() + public long getMaxReadIdleMillis() { - return _maxReadIdle; + return _maxReadIdleMillis; } @Override - public int getMaxWriteIdle() + public long getMaxWriteIdleMillis() { - return _maxWriteIdle; + return _maxWriteIdleMillis; } } Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1714530&r1=1714529&r2=1714530&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Mon Nov 16 09:50:41 2015 @@ -367,8 +367,8 @@ public class MaxFrameSizeTest extends Qp _maxFrameSize, actualHeartbeatInterval); - conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); - conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); + conn.getNetworkConnection().setMaxReadIdleMillis((long)(1000L * actualHeartbeatInterval*heartbeatTimeoutFactor)); + conn.getNetworkConnection().setMaxWriteIdleMillis(1000L * actualHeartbeatInterval); conn.setMaxFrameSize(_maxFrameSize); int channelMax = tune.getChannelMax(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org