Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Feb 10 16:15:08 2015 @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -64,7 +65,9 @@ import org.apache.qpid.AMQUnresolvedAddr import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -192,6 +195,22 @@ public class AMQConnection extends Close private boolean _compressMessages; private int _messageCompressionThresholdSize; + static + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Qpid version : " + QpidProperties.getVersionString()); + } + + // The registering of any additional SASL mechanisms with the Java Security API requires + // SecurityManager permissions. In execution environments such as web containers, + // this may require adjustments to the Java security.policy. + CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Loaded mechanisms " + registry.getMechanisms()); + } + } /** * @param broker brokerdetails * @param username username @@ -847,13 +866,17 @@ public class AMQConnection extends Close public void close(long timeout) throws JMSException { - close(new ArrayList<AMQSession>(_sessions.values()), timeout); - } + boolean closed; - public void close(List<AMQSession> sessions, long timeout) throws JMSException - { - if (!setClosed()) + synchronized (_sessionCreationLock) + { + closed = setClosed(); + } + + if (!closed) { + List<AMQSession> sessions = new ArrayList<>(_sessions.values()); + setClosing(true); try { @@ -868,54 +891,52 @@ public class AMQConnection extends Close private void doClose(List<AMQSession> sessions, long timeout) throws JMSException { - synchronized (_sessionCreationLock) + if (!sessions.isEmpty()) { - if (!sessions.isEmpty()) + AMQSession session = sessions.remove(0); + synchronized (session.getMessageDeliveryLock()) { - AMQSession session = sessions.remove(0); - synchronized (session.getMessageDeliveryLock()) - { - doClose(sessions, timeout); - } + doClose(sessions, timeout); } - else + } + else + { + synchronized (getFailoverMutex()) { - synchronized (getFailoverMutex()) + try { try { - try - { - closeAllSessions(null, timeout); - } - finally - { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); - } + closeAllSessions(null, timeout); } - catch (JMSException e) + finally { - _logger.error("Error closing connection", e); - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } - finally + } + catch (JMSException e) + { + _logger.error("Error closing connection", e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + finally + { + try + { + _delegate.closeConnection(timeout); + } + catch (Exception e) { - try - { - _delegate.closeConnection(timeout); - } - catch (Exception e) - { - _logger.warn("Error closing underlying protocol connection", e); - } + _logger.warn("Error closing underlying protocol connection", e); } } } } + } private void shutdownTaskPool(final long timeout) @@ -1290,28 +1311,29 @@ public class AMQConnection extends Close try { - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + // decide if we are going to close the session + if (hardError(cause)) { - // decide if we are going to close the session - if (hardError(cause)) - { - closer = (!setClosed()) || closer; - { - _logger.info("Closing AMQConnection due to :" + cause); - } - } - else + closer = (!setClosed()) || closer; { - _logger.info("Not a hard-error connection not closing: " + cause); + _logger.info("Closing AMQConnection due to :" + cause); } + } + else + { + _logger.info("Not a hard-error connection not closing: " + cause); + } - // if we are closing the connection, close sessions first - if (closer) + + // if we are closing the connection, close sessions first + if (closer) + { + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { try { - closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); } catch (JMSException e) { @@ -1328,16 +1350,32 @@ public class AMQConnection extends Close private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) { - // deliver the exception if there is a listener - ExceptionListener exceptionListener = getExceptionListenerNoCheck(); + final ExceptionListener exceptionListener = getExceptionListenerNoCheck(); if (exceptionListener != null) { - exceptionListener.onException(je); - } - else + performConnectionTask(new Runnable() + { + @Override + public void run() + { + // deliver the exception if there is a listener + try + { + exceptionListener.onException(je); + } + catch (RuntimeException e) + { + _logger.error("Exception occurred in ExceptionListener", e); + } + } + }); + } + else { _logger.error("Throwable Received but no listener set: " + cause); } + + } private boolean hardError(Throwable cause) @@ -1448,7 +1486,17 @@ public class AMQConnection extends Close public void performConnectionTask(Runnable task) { - _taskPool.execute(task); + try + { + _taskPool.execute(task); + } + catch (RejectedExecutionException e) + { + if(!(isClosed() || isClosing())) + { + throw e; + } + } } public AMQSession getSession(int channelId)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Feb 10 16:15:08 2015 @@ -291,7 +291,7 @@ public class AMQConnectionDelegate_0_10 public void closed(Connection conn) { - ConnectionException exc = exception; + final ConnectionException exc = exception; exception = null; if (exc == null) @@ -299,7 +299,7 @@ public class AMQConnectionDelegate_0_10 return; } - ConnectionClose close = exc.getClose(); + final ConnectionClose close = exc.getClose(); if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED) { _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); @@ -332,23 +332,31 @@ public class AMQConnectionDelegate_0_10 _conn.setClosed(); - ExceptionListener listener = _conn.getExceptionListenerNoCheck(); + final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { _logger.error("connection exception: " + conn, exc); } else { - String code = null; - if (close != null) + _conn.performConnectionTask(new Runnable() { - code = close.getReplyCode().toString(); - } + @Override + public void run() + { + String code = null; + if (close != null) + { + code = close.getReplyCode().toString(); + } + + JMSException ex = new JMSException(exc.getMessage(), code); + ex.setLinkedException(exc); + ex.initCause(exc); + listener.onException(ex); + } + }); - JMSException ex = new JMSException(exc.getMessage(), code); - ex.setLinkedException(exc); - ex.initCause(exc); - listener.onException(ex); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 10 16:15:08 2015 @@ -808,7 +808,16 @@ public abstract class AMQSession<C exten */ public void close(long timeout) throws JMSException { - close(timeout, true); + synchronized (_messageDeliveryLock) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (getFailoverMutex()) + { + + close(timeout, true); + } + } } private void close(long timeout, boolean sendClose) throws JMSException @@ -822,52 +831,44 @@ public abstract class AMQSession<C exten if (!setClosed()) { setClosing(true); - synchronized (getFailoverMutex()) + // we pass null since this is not an error case + closeProducersAndConsumers(null); + + try { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + // If the connection is open or we are in the process + // of closing the connection then send a cance + // no point otherwise as the connection will be gone + if (!_connection.isClosed() || _connection.isClosing()) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try + if (sendClose) { - // If the connection is open or we are in the process - // of closing the connection then send a cance - // no point otherwise as the connection will be gone - if (!_connection.isClosed() || _connection.isClosing()) - { - if (sendClose) - { - sendClose(timeout); - } - } - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - catch (TransportException e) - { - throw toJMSException("Error closing session:" + e.getMessage(), e); - } - finally - { - _connection.deregisterSession(_channelId); + sendClose(timeout); } } } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + catch (TransportException e) + { + throw toJMSException("Error closing session:" + e.getMessage(), e); + } + finally + { + _connection.deregisterSession(_channelId); + } } } @@ -899,24 +900,22 @@ public abstract class AMQSession<C exten if (!setClosed()) { - synchronized (_messageDeliveryLock) + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + amqe = (AMQException) e; } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); } + } protected void stopDispatcherThread() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Feb 10 16:15:08 2015 @@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQS private void returnBouncedMessage(final ReturnMessage msg) { - getAMQConnection().performConnectionTask(new Runnable() + try { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, - _topicDestinationCache, AMQDestination.UNKNOWN_TYPE); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } else if (errorCode == AMQConstant.NO_ROUTE) - { - getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } else - { - getAMQConnection().exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + getMessageFactoryRegistry().createMessage(0, + false, + msg.getExchange(), + msg.getRoutingKey(), + msg.getContentHeader(), + msg.getBodies(), + _queueDestinationCache, + _topicDestinationCache, + AMQDestination.UNKNOWN_TYPE); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - } catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, + bouncedMessage, + null)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); } - }); + else + { + getAMQConnection().exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); + } + + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Feb 10 16:15:08 2015 @@ -598,35 +598,38 @@ public abstract class BasicMessageConsum if (sendClose) { + // The Synchronized block only needs to protect network traffic. - synchronized (_connection.getFailoverMutex()) + + try { - try + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) { - // If the session is open or we are in the process - // of closing the session then send a cance - // no point otherwise as the connection will be gone - if (!_session.isClosed() || _session.isClosing()) + synchronized(_session.getMessageDeliveryLock()) { - synchronized(_session.getMessageDeliveryLock()) + synchronized (_connection.getFailoverMutex()) { sendCancel(); } } } - catch (AMQException e) - { - throw new JMSAMQException("Error closing consumer: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("FailoverException interrupted basic cancel.", e); - } - catch (TransportException e) - { - throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); - } } + catch (AMQException e) + { + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); + } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } + } else { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Tue Feb 10 16:15:08 2015 @@ -20,14 +20,14 @@ */ package org.apache.qpid.client; +import java.io.IOException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.URLSyntaxException; -import java.io.IOException; - public class MockAMQConnection extends AMQConnection { public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost) @@ -60,4 +60,10 @@ public class MockAMQConnection extends A { return super.getDelegate(); } + + @Override + public void performConnectionTask(final Runnable task) + { + task.run(); + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java Tue Feb 10 16:15:08 2015 @@ -42,7 +42,6 @@ public class ServerDecoder extends AMQDe throws AMQFrameDecodingException, IOException { ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor(); - ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId); final int classAndMethod = in.readInt(); int classId = classAndMethod >> 16; int methodId = classAndMethod & 0xFFFF; @@ -115,116 +114,117 @@ public class ServerDecoder extends AMQDe ChannelOpenBody.process(channelId, in, methodProcessor); break; case 0x00140014: - ChannelFlowBody.process(in, channelMethodProcessor); + ChannelFlowBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00140015: - ChannelFlowOkBody.process(in, channelMethodProcessor); + ChannelFlowOkBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00140028: - ChannelCloseBody.process(in, channelMethodProcessor); + ChannelCloseBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00140029: - channelMethodProcessor.receiveChannelCloseOk(); + methodProcessor.getChannelMethodProcessor(channelId).receiveChannelCloseOk(); break; // ACCESS_CLASS: case 0x001e000a: - AccessRequestBody.process(in, channelMethodProcessor); + AccessRequestBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; // EXCHANGE_CLASS: case 0x0028000a: - ExchangeDeclareBody.process(in, channelMethodProcessor); + ExchangeDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00280014: - ExchangeDeleteBody.process(in, channelMethodProcessor); + ExchangeDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00280016: - ExchangeBoundBody.process(in, channelMethodProcessor); + ExchangeBoundBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; // QUEUE_CLASS: case 0x0032000a: - QueueDeclareBody.process(in, channelMethodProcessor); + QueueDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00320014: - QueueBindBody.process(in, channelMethodProcessor); + QueueBindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x0032001e: - QueuePurgeBody.process(in, channelMethodProcessor); + QueuePurgeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00320028: - QueueDeleteBody.process(in, channelMethodProcessor); + QueueDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x00320032: - QueueUnbindBody.process(in, channelMethodProcessor); + QueueUnbindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; // BASIC_CLASS: case 0x003c000a: - BasicQosBody.process(in, channelMethodProcessor); + BasicQosBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0014: - BasicConsumeBody.process(in, channelMethodProcessor); + BasicConsumeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c001e: - BasicCancelBody.process(in, channelMethodProcessor); + BasicCancelBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0028: - BasicPublishBody.process(in, channelMethodProcessor); + BasicPublishBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0046: - BasicGetBody.process(in, channelMethodProcessor); + BasicGetBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0050: - BasicAckBody.process(in, channelMethodProcessor); + BasicAckBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c005a: - BasicRejectBody.process(in, channelMethodProcessor); + BasicRejectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0064: - BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor); + BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), + methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0066: - BasicRecoverSyncBody.process(in, channelMethodProcessor); + BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c006e: - BasicRecoverSyncBody.process(in, channelMethodProcessor); + BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; case 0x003c0078: - BasicNackBody.process(in, channelMethodProcessor); + BasicNackBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; // CONFIRM CLASS: case 0x0055000a: - ConfirmSelectBody.process(in, channelMethodProcessor); + ConfirmSelectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId)); break; // TX_CLASS: case 0x005a000a: - if(!channelMethodProcessor.ignoreAllButCloseOk()) + if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk()) { - channelMethodProcessor.receiveTxSelect(); + methodProcessor.getChannelMethodProcessor(channelId).receiveTxSelect(); } break; case 0x005a0014: - if(!channelMethodProcessor.ignoreAllButCloseOk()) + if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk()) { - channelMethodProcessor.receiveTxCommit(); + methodProcessor.getChannelMethodProcessor(channelId).receiveTxCommit(); } break; case 0x005a001e: - if(!channelMethodProcessor.ignoreAllButCloseOk()) + if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk()) { - channelMethodProcessor.receiveTxRollback(); + methodProcessor.getChannelMethodProcessor(channelId).receiveTxRollback(); } break; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java Tue Feb 10 16:15:08 2015 @@ -20,12 +20,8 @@ */ package org.apache.qpid.common; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.InputStream; -import java.util.Map; import java.util.Properties; /** @@ -37,18 +33,9 @@ import java.util.Properties; * * <p>To get the build version of any Qpid code call the {@link #main} method. This version string is usually also * printed to the console on broker start up. - * <p> - * TODO Code to locate/load/log properties can be factored into a reusable properties utils class. Avoid having this - * same snippet of loading code scattered in many places. - * <p> - * TODO Could also add a build number property for a sequential build number assigned by an automated build system, for - * build reproducability purposes. */ public class QpidProperties { - /** Used for debugging purposes. */ - private static final Logger _logger = LoggerFactory.getLogger(QpidProperties.class); - /** The name of the version properties file to load from the class path. */ public static final String VERSION_RESOURCE = "qpidversion.properties"; @@ -68,53 +55,43 @@ public class QpidProperties private static final String DEFAULT = "unknown"; /** Holds the product name. */ - private static String productName = DEFAULT; + private static final String productName; /** Holds the product version. */ - private static String releaseVersion = DEFAULT; + private static final String releaseVersion; /** Holds the source code revision. */ - private static String buildVersion = DEFAULT; + private static final String buildVersion; + + private static final Properties properties = new Properties(); // Loads the values from the version properties file. static { - Properties props = new Properties(); - try + try(InputStream propertyStream = QpidProperties.class.getClassLoader().getResourceAsStream(VERSION_RESOURCE)) { - InputStream propertyStream = QpidProperties.class.getClassLoader().getResourceAsStream(VERSION_RESOURCE); - if (propertyStream == null) + if (propertyStream != null) { - _logger.warn("Unable to find resource " + VERSION_RESOURCE + " from classloader"); - } - else - { - props.load(propertyStream); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Dumping QpidProperties"); - for (Map.Entry<Object, Object> entry : props.entrySet()) - { - _logger.debug("Property: " + entry.getKey() + " Value: " + entry.getValue()); - } - - _logger.debug("End of property dump"); - } - - productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY); - String versionSuffix = (String) props.get(RELEASE_VERSION_SUFFIX); - String version = readPropertyValue(props, RELEASE_VERSION_PROPERTY); - releaseVersion = versionSuffix == null || "".equals(versionSuffix) ? version : version + ";" + versionSuffix; - buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY); + properties.load(propertyStream); } } catch (IOException e) { - // Log a warning about this and leave the values initialized to unknown. - _logger.error("Could not load version.properties resource: " + e, e); + // Ignore, most likely running within an IDE, values will have the DEFAULT text } + + String versionSuffix = properties.getProperty(RELEASE_VERSION_SUFFIX); + String version = properties.getProperty(RELEASE_VERSION_PROPERTY, DEFAULT); + + productName = properties.getProperty(PRODUCT_NAME_PROPERTY, DEFAULT); + releaseVersion = versionSuffix == null || "".equals(versionSuffix) ? version : version + ";" + versionSuffix; + buildVersion = properties.getProperty(BUILD_VERSION_PROPERTY, DEFAULT); + } + + public static Properties asProperties() + { + return new Properties(properties); } /** @@ -158,27 +135,6 @@ public class QpidProperties } /** - * Helper method to extract a named property from properties. - * - * @param props The properties. - * @param propertyName The named property to extract. - * - * @return The extracted property or a default value if the properties do not contain the named property. - * - * @todo A bit pointless. - */ - private static String readPropertyValue(Properties props, String propertyName) - { - String retVal = (String) props.get(propertyName); - if (retVal == null) - { - retVal = DEFAULT; - } - - return retVal; - } - - /** * Prints the versioning information to the console. This is extremely usefull for identifying Qpid code in the * wild, where the origination of the code has been forgotten. * Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Tue Feb 10 16:15:08 2015 @@ -18,6 +18,17 @@ package org.apache.qpid.configuration; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * This class centralized the Qpid client properties. * @@ -25,6 +36,8 @@ package org.apache.qpid.configuration; */ public class ClientProperties { + private static final Logger LOGGER = LoggerFactory.getLogger(ClientProperties.class); + /** * Currently with Qpid it is not possible to change the client ID. * If one is not specified upon connection construction, an id is generated automatically. @@ -292,6 +305,48 @@ public class ClientProperties */ public static final String QPID_USE_LEGACY_GETQUEUEDEPTH_BEHAVIOUR = "qpid.use_legacy_getqueuedepth_behavior"; + static + { + // force load of common properties + Class<CommonProperties> commonPropertiesClass = CommonProperties.class; + + Properties props = new Properties(); + String initialProperties = System.getProperty("qpid.client_properties_file"); + URL initialPropertiesLocation = null; + try + { + if (initialProperties == null) + { + initialPropertiesLocation = ClientProperties.class.getClassLoader().getResource("qpid-client.properties"); + } + else + { + initialPropertiesLocation = (new File(initialProperties)).toURI().toURL(); + } + + if (initialPropertiesLocation != null) + { + props.load(initialPropertiesLocation.openStream()); + } + } + catch (MalformedURLException e) + { + LOGGER.warn("Could not open client properties file '"+initialProperties+"'.", e); + } + catch (IOException e) + { + LOGGER.warn("Could not open client properties file '" + initialPropertiesLocation + "'.", e); + } + + Set<String> propertyNames = new HashSet<>(props.stringPropertyNames()); + propertyNames.removeAll(System.getProperties().stringPropertyNames()); + for (String propName : propertyNames) + { + System.setProperty(propName, props.getProperty(propName)); + } + + } + private ClientProperties() { //No instances Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java Tue Feb 10 16:15:08 2015 @@ -20,6 +20,19 @@ */ package org.apache.qpid.configuration; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.common.QpidProperties; + /** * Centralised record of Qpid common properties. * @@ -27,6 +40,8 @@ package org.apache.qpid.configuration; */ public class CommonProperties { + private static final Logger LOGGER = LoggerFactory.getLogger(CommonProperties.class); + /** * The timeout used by the IO layer for timeouts such as send timeout in IoSender, and the close timeout for IoSender and IoReceiver */ @@ -36,6 +51,45 @@ public class CommonProperties public static final String HANDSHAKE_TIMEOUT_PROP_NAME = "qpid.handshake_timeout"; public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2; + static + { + + Properties props = new Properties(QpidProperties.asProperties()); + String initialProperties = System.getProperty("qpid.common_properties_file"); + URL initialPropertiesLocation = null; + try + { + if (initialProperties == null) + { + initialPropertiesLocation = CommonProperties.class.getClassLoader().getResource("qpid-common.properties"); + } + else + { + initialPropertiesLocation = (new File(initialProperties)).toURI().toURL(); + } + + if (initialPropertiesLocation != null) + { + props.load(initialPropertiesLocation.openStream()); + } + } + catch (MalformedURLException e) + { + LOGGER.warn("Could not open common properties file '"+initialProperties+"'.", e); + } + catch (IOException e) + { + LOGGER.warn("Could not open common properties file '" + initialPropertiesLocation + "'.", e); + } + + Set<String> propertyNames = new HashSet<>(props.stringPropertyNames()); + propertyNames.removeAll(System.getProperties().stringPropertyNames()); + for (String propName : propertyNames) + { + System.setProperty(propName, props.getProperty(propName)); + } + + } private CommonProperties() { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Tue Feb 10 16:15:08 2015 @@ -535,6 +535,8 @@ public class Connection extends Connecti connectionLost.set(true); synchronized (lock) { + log.error(e, "exception: %s", e.getMessage()); + switch (state) { case OPENING: Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java Tue Feb 10 16:15:08 2015 @@ -21,6 +21,7 @@ package org.apache.qpid.transport; import java.net.InetSocketAddress; +import java.util.Collection; /** * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing @@ -30,17 +31,21 @@ import java.net.InetSocketAddress; public interface NetworkTransportConfiguration { // Taken from Socket - Boolean getTcpNoDelay(); + boolean getTcpNoDelay(); // The amount of memory in bytes to allocate to the incoming buffer - Integer getReceiveBufferSize(); + int getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - Integer getSendBufferSize(); + int getSendBufferSize(); InetSocketAddress getAddress(); boolean needClientAuth(); boolean wantClientAuth(); + + Collection<String> getEnabledCipherSuites(); + + Collection<String> getDisabledCipherSuites(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Tue Feb 10 16:15:08 2015 @@ -26,12 +26,15 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLSocket; + import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.util.SystemUtils; public final class IoSender implements Runnable, ByteBufferSender @@ -58,6 +61,12 @@ public final class IoSender implements R private final Thread senderThread; private IoReceiver _receiver; private final String _remoteSocketAddress; + private static final boolean shutdownBroken; + + static + { + shutdownBroken = SystemUtils.isWindows(); + } private volatile Throwable exception = null; @@ -314,6 +323,18 @@ public final class IoSender implements R } } } + + if (!shutdownBroken && !(socket instanceof SSLSocket)) + { + try + { + socket.shutdownOutput(); + } + catch (IOException e) + { + //pass + } + } } public void setReceiver(IoReceiver receiver) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Tue Feb 10 16:15:08 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.transport.networ import java.net.SocketAddress; import java.nio.channels.SocketChannel; import java.security.Principal; +import java.util.Collection; import java.util.Set; import javax.net.ssl.SSLContext; @@ -61,7 +62,10 @@ public class NonBlockingConnection imple final SSLContext sslContext, final boolean wantClientAuth, final boolean needClientAuth, - final Runnable onTransportEncryptionAction, final SelectorThread selectorThread) + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites, + final Runnable onTransportEncryptionAction, + final SelectorThread selectorThread) { _socketChannel = socketChannel; _timeout = timeout; @@ -69,10 +73,20 @@ public class NonBlockingConnection imple _selector = selectorThread; _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this, - delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); + delegate, + receiveBufferSize, + ticker, + encryptionSet, + sslContext, + wantClientAuth, + needClientAuth, + enabledCipherSuites, + disabledCipherSuites, + onTransportEncryptionAction); } + public Ticker getTicker() { return _ticker; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Tue Feb 10 16:15:08 2015 @@ -152,6 +152,8 @@ public class NonBlockingNetworkTransport _sslContext, _config.wantClientAuth(), _config.needClientAuth(), + _config.getEnabledCipherSuites(), + _config.getDisabledCipherSuites(), new Runnable() { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Tue Feb 10 16:15:08 2015 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.security.Principal; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -87,6 +88,8 @@ public class NonBlockingSenderReceiver final SSLContext sslContext, final boolean wantClientAuth, final boolean needClientAuth, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites, final Runnable onTransportEncryptionAction) { _connection = connection; @@ -112,6 +115,8 @@ public class NonBlockingSenderReceiver _sslEngine = _sslContext.createSSLEngine(); _sslEngine.setUseClientMode(false); SSLUtil.removeSSLv3Support(_sslEngine); + SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); + if(needClientAuth) { _sslEngine.setNeedClientAuth(true); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java Tue Feb 10 16:15:08 2015 @@ -187,7 +187,10 @@ public class SSLReceiver implements Byte } catch(SSLException e) { - log.error(e, "Error caught in SSLReceiver"); + if (log.isDebugEnabled()) + { + log.debug(e, "Error caught in SSLReceiver"); + } _sslStatus.setSslErrorFlag(); synchronized(_sslStatus.getSslLock()) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java Tue Feb 10 16:15:08 2015 @@ -142,7 +142,7 @@ public class SSLSender implements ByteBu public void send(ByteBuffer appData) { - if (closed.get()) + if (closed.get() && !_sslStatus.getSslErrorFlag()) { throw new SenderException("SSL Sender is closed"); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java Tue Feb 10 16:15:08 2015 @@ -24,6 +24,9 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.URL; import java.security.GeneralSecurityException; import java.security.KeyStore; @@ -33,7 +36,10 @@ import java.security.cert.CertificatePar import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -266,7 +272,35 @@ public class SSLUtil return ks; } - public static void removeSSLv3Support(final SSLEngine engine) + private static interface SSLEntity + { + String[] getEnabledCipherSuites(); + + void setEnabledCipherSuites(String[] strings); + + String[] getEnabledProtocols(); + + void setEnabledProtocols(String[] protocols); + + String[] getSupportedCipherSuites(); + + String[] getSupportedProtocols(); + } + + private static SSLEntity asSSLEntity(final Object object, final Class<?> clazz) + { + return (SSLEntity) Proxy.newProxyInstance(SSLEntity.class.getClassLoader(), new Class[] { SSLEntity.class }, new InvocationHandler() + { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable + { + Method delegateMethod = clazz.getMethod(method.getName(), method.getParameterTypes()); + return delegateMethod.invoke(object, args); + } + }) ; + } + + private static void removeSSLv3Support(final SSLEntity engine) { List<String> enabledProtocols = Arrays.asList(engine.getEnabledProtocols()); if(enabledProtocols.contains(SSLV3_PROTOCOL)) @@ -277,26 +311,61 @@ public class SSLUtil } } - public static void removeSSLv3Support(final SSLSocket socket) + public static void removeSSLv3Support(final SSLEngine engine) { - List<String> enabledProtocols = Arrays.asList(socket.getEnabledProtocols()); - if(enabledProtocols.contains(SSLV3_PROTOCOL)) - { - List<String> allowedProtocols = new ArrayList<>(enabledProtocols); - allowedProtocols.remove(SSLV3_PROTOCOL); - socket.setEnabledProtocols(allowedProtocols.toArray(new String[allowedProtocols.size()])); - } + removeSSLv3Support(asSSLEntity(engine, SSLEngine.class)); } + public static void removeSSLv3Support(final SSLSocket socket) + { + removeSSLv3Support(asSSLEntity(socket, SSLSocket.class)); + } public static void removeSSLv3Support(final SSLServerSocket socket) { - List<String> enabledProtocols = Arrays.asList(socket.getEnabledProtocols()); - if(enabledProtocols.contains(SSLV3_PROTOCOL)) + removeSSLv3Support(asSSLEntity(socket, SSLServerSocket.class)); + } + + private static void updateEnabledCipherSuites(final SSLEntity entity, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites) + { + if(enabledCipherSuites != null && !enabledCipherSuites.isEmpty()) { - List<String> allowedProtocols = new ArrayList<>(enabledProtocols); - allowedProtocols.remove(SSLV3_PROTOCOL); - socket.setEnabledProtocols(allowedProtocols.toArray(new String[allowedProtocols.size()])); + final Set<String> supportedSuites = + new HashSet<>(Arrays.asList(entity.getSupportedCipherSuites())); + supportedSuites.retainAll(enabledCipherSuites); + entity.setEnabledCipherSuites(supportedSuites.toArray(new String[supportedSuites.size()])); + } + + if(disabledCipherSuites != null && !disabledCipherSuites.isEmpty()) + { + final Set<String> enabledSuites = new HashSet<>(Arrays.asList(entity.getEnabledCipherSuites())); + enabledSuites.removeAll(disabledCipherSuites); + entity.setEnabledCipherSuites(enabledSuites.toArray(new String[enabledSuites.size()])); } + + } + + + public static void updateEnabledCipherSuites(final SSLEngine engine, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites) + { + updateEnabledCipherSuites(asSSLEntity(engine, SSLEngine.class), enabledCipherSuites, disabledCipherSuites); + } + + public static void updateEnabledCipherSuites(final SSLServerSocket socket, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites) + { + updateEnabledCipherSuites(asSSLEntity(socket, SSLServerSocket.class), enabledCipherSuites, disabledCipherSuites); + } + + public static void updateEnabledCipherSuites(final SSLSocket socket, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites) + { + updateEnabledCipherSuites(asSSLEntity(socket, SSLSocket.class), enabledCipherSuites, disabledCipherSuites); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/pom.xml URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/pom.xml?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/pom.xml (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/pom.xml Tue Feb 10 16:15:08 2015 @@ -74,6 +74,7 @@ <dollar.sign>$</dollar.sign> <at.sign>@</at.sign> <bdb-version>5.0.104</bdb-version> + <derby-version>10.11.1.1</derby-version> </properties> <modules> Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/etc/config-systests.json URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/etc/config-systests.json?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/etc/config-systests.json (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/etc/config-systests.json Tue Feb 10 16:15:08 2015 @@ -21,7 +21,7 @@ { "name": "Broker", "defaultVirtualHost" : "test", - "modelVersion": "2.1", + "modelVersion": "3.0", "authenticationproviders" : [ { "name" : "plain", "type" : "PlainPasswordFile", @@ -29,12 +29,12 @@ } ], "keystores" : [ { "name" : "systestsKeyStore", - "path" : "${QPID_HOME}${file.separator}..${file.separator}test-profiles${file.separator}test_resources${file.separator}ssl${file.separator}java_broker_keystore.jks", + "storeUrl" : "${QPID_HOME}${file.separator}..${file.separator}test-profiles${file.separator}test_resources${file.separator}ssl${file.separator}java_broker_keystore.jks", "password" : "password" } ], "truststores" : [ { "name" : "systestsTrustStore", - "path" : "${QPID_HOME}${file.separator}..${file.separator}test-profiles${file.separator}test_resources${file.separator}ssl${file.separator}java_broker_truststore.jks", + "storeUrl" : "${QPID_HOME}${file.separator}..${file.separator}test-profiles${file.separator}test_resources${file.separator}ssl${file.separator}java_broker_truststore.jks", "password" : "password" } ], "ports" : [ { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java Tue Feb 10 16:15:08 2015 @@ -20,48 +20,77 @@ */ package org.apache.qpid.client.session; -import java.util.Collections; - import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; -import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.AMQBindingURL; public class QueueDeclareTest extends QpidBrokerTestCase { private Connection _connection; private AMQSession<?, ?> _session; - protected void setUp() throws Exception { super.setUp(); _connection = getConnection(); + _connection.start(); _session = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED); } public void testDeclareAndBindWhenQueueIsNotSpecifiedInDestinationUrl() throws Exception { - AMQQueue destination = new AMQQueue(new AMQBindingURL("topic://amq.topic//?routingkey='testTopic'")); + AMQDestination destination = (AMQDestination) _session.createQueue("topic://amq.topic//?routingkey='testTopic'"); - assertEquals("Queue name is generated in parser", AMQShortString.EMPTY_STRING, destination.getAMQQueueName()); + assertEquals("Non empty queue name unexpectedly generated by parser : " + destination.getAMQQueueName(), AMQShortString.EMPTY_STRING, destination.getAMQQueueName()); - _session.declareAndBind(destination, FieldTable.convertToFieldTable(Collections.<String, Object> emptyMap())); + _session.declareAndBind(destination); - assertFalse("Unexpected queue name: [" + destination.getAMQQueueName() + "]", AMQShortString.EMPTY_STRING.equals(destination.getAMQQueueName())); + assertFalse("Non empty queue name should have been generated by declareAndBind", + AMQShortString.EMPTY_STRING.equals(destination.getAMQQueueName())); sendMessage(_session, destination, 1); + receiveMessage(destination); + } + + public void testDeclareIgnoresNonDurableFlagIfDurableQueueAlreadyExists() throws Exception + { + String format = "direct://amq.direct//%s?durable='%s'"; + AMQDestination durable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), true)); + AMQDestination nondurable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), false)); + + verifyDurabiltyIgnoreIfQueueExists(durable, nondurable); + } + + public void testDeclareIgnoresDurableFlagIfNonDurableQueueAlreadyExists() throws Exception + { + String format = "direct://amq.direct//%s?durable='%s'"; + AMQDestination nondurable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), false)); + AMQDestination durable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), true)); + verifyDurabiltyIgnoreIfQueueExists(nondurable, durable); + } + + private void verifyDurabiltyIgnoreIfQueueExists(final AMQDestination firstDeclare, final AMQDestination secondDeclare) throws Exception + { + _session.declareAndBind(firstDeclare); + + sendMessage(_session, firstDeclare, 1); + + _session.declareAndBind(secondDeclare); + receiveMessage(secondDeclare); + } + + private void receiveMessage(final Destination destination) throws Exception + { MessageConsumer consumer = _session.createConsumer(destination); - _connection.start(); - Message message = consumer.receive(1000l); + Message message = consumer.receive(RECEIVE_TIMEOUT); assertNotNull("Message not received", message); _session.commit(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java Tue Feb 10 16:15:08 2015 @@ -112,7 +112,9 @@ public class SSLTest extends QpidBrokerT } catch (JMSException e) { - assertTrue("Unexpected exception message", e.getMessage().contains("Unrecognized SSL message, plaintext connection?")); + // PASS + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Unrecognized SSL message, plaintext connection?")); } } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java Tue Feb 10 16:15:08 2015 @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.logging; -import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; @@ -306,54 +305,6 @@ public class ChannelLoggingTest extends validateChannelClose(results); } - public void testChannelClosedOnQueueArgumentsMismatch() throws Exception - { - assertLoggingNotYetOccured(CHANNEL_PREFIX); - - Connection connection = getConnection(); - - // Create a session and then close it - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - waitForMessage("CHN-1001"); - - String testQueueName = getTestQueueName(); - - Queue nonDurableQueue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName - + "?durable='false'"); - - ((AMQSession<?,?>)session).declareAndBind((AMQDestination)nonDurableQueue); - - Queue durableQueue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName - + "?durable='true'"); - try - { - ((AMQSession<?,?>)session).declareAndBind((AMQDestination) durableQueue); - fail("Exception not thrown"); - } - catch (AMQChannelClosedException acce) - { - // pass - } - catch (Exception e) - { - fail("Wrong exception thrown " + e); - } - waitForMessage("CHN-1003"); - - List<String> results = findMatches(CHANNEL_PREFIX); - assertTrue("No CHN messages logged", results.size() > 0); - - String closeLog = results.get(results.size() -1); - int closeMessageID = closeLog.indexOf("CHN-1003"); - assertFalse("CHN-1003 is not found", closeMessageID == -1); - - String closeMessage = closeLog.substring(closeMessageID); - assertTrue("Unexpected close channel message :" + closeMessage, Pattern.matches(CHANNEL_CLOSE_FORCED_MESSAGE_PATTERN, closeMessage)); - - session.close(); - connection.close(); - } - public void testChannelClosedOnExclusiveQueueDeclaredOnDifferentSession() throws Exception { assertLoggingNotYetOccured(CHANNEL_PREFIX); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java Tue Feb 10 16:15:08 2015 @@ -203,7 +203,7 @@ public class ExternalAuthenticationTest //add the peersOnly store to the config Map<String, Object> sslTrustStoreAttributes = new HashMap<String, Object>(); sslTrustStoreAttributes.put(TrustStore.NAME, peerStoreName); - sslTrustStoreAttributes.put(FileTrustStore.PATH, BROKER_PEERSTORE); + sslTrustStoreAttributes.put(FileTrustStore.STORE_URL, BROKER_PEERSTORE); sslTrustStoreAttributes.put(FileTrustStore.PASSWORD, BROKER_PEERSTORE_PASSWORD); sslTrustStoreAttributes.put(FileTrustStore.PEERS_ONLY, true); getBrokerConfiguration().addObjectConfiguration(TrustStore.class, sslTrustStoreAttributes); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java Tue Feb 10 16:15:08 2015 @@ -211,47 +211,6 @@ public class ManagementLoggingTest exten } /** - * Description: - * Using the default configuration with SSL enabled for the management port the SSL Keystore path should be reported via MNG-1006 - * Input: - * Management SSL enabled default configuration. - * Output: - * - * <date> MESSAGE MNG-1006 : Using SSL Keystore : test_resources/ssl/keystore.jks - * - * Validation Steps: - * - * 1. The MNG ID is correct - * 2. The keystore path is as specified in the configuration - */ - public void testManagementStartupSSLKeystore() throws Exception - { - if (isJavaBroker()) - { - setSystemProperty("javax.net.debug", "ssl"); - startBrokerAndCreateMonitor(true, true); - - List<String> results = waitAndFindMatches("MNG-1006"); - - assertTrue("MNGer message not logged", results.size() > 0); - - String log = getLogMessage(results, 0); - - //1 - validateMessageID("MNG-1006", log); - - // Validate we only have two MNG-1002 (one via stdout, one via log4j) - results = findMatches("MNG-1006"); - assertEquals("Upexpected SSL Keystore message count", - 1, results.size()); - - // Validate the keystore path is as expected - assertTrue("SSL Keystore entry expected.:" + getMessageString(log), - getMessageString(log).endsWith("systestsKeyStore")); - } - } - - /** * Description: Tests the management connection open/close are logged correctly. * * Output: Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java?rev=1658748&r1=1658747&r2=1658748&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java Tue Feb 10 16:15:08 2015 @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.management.plugin.servlet.rest.RestServlet; import org.apache.qpid.server.model.AccessControlProvider; import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.State; @@ -183,7 +184,7 @@ public class AccessControlProviderRestTe getRestTestHelper().setUsernameAndPassword(BrokerOptions.MANAGEMENT_MODE_USER_NAME, MANAGEMENT_MODE_PASSWORD); - Map<String, Object> acl = getRestTestHelper().getJsonAsSingletonList("accesscontrolprovider/" + TestBrokerConfiguration.ENTRY_NAME_ACL_FILE); + Map<String, Object> acl = getRestTestHelper().getJsonAsSingletonList("accesscontrolprovider/" + TestBrokerConfiguration.ENTRY_NAME_ACL_FILE + "?" + RestServlet.OVERSIZE_PARAM + "=" + (file.getAbsolutePath().length()+10)); assertEquals("Unexpected id", id.toString(), acl.get(AccessControlProvider.ID)); assertEquals("Unexpected path", file.getAbsolutePath() , acl.get(FileAccessControlProviderConstants.PATH)); assertEquals("Unexpected state", State.ERRORED.name() , acl.get(AccessControlProvider.STATE)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org