qpid-jms git commit: QPIDJMS-279 Test for NettyTcpTransport honoring useEpoll option
Repository: qpid-jms Updated Branches: refs/heads/master 6c2a7c144 -> adbdb85f8 QPIDJMS-279 Test for NettyTcpTransport honoring useEpoll option Handle derived Netty transport types. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/adbdb85f Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/adbdb85f Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/adbdb85f Branch: refs/heads/master Commit: adbdb85f8a8f1743e6bce05923f46c6eee17e25f Parents: 6c2a7c1 Author: Timothy BishAuthored: Mon Apr 3 19:20:27 2017 -0400 Committer: Timothy Bish Committed: Mon Apr 3 19:20:27 2017 -0400 -- .../transports/netty/NettyTcpTransportTest.java| 17 - 1 file changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/adbdb85f/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java -- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java index 7183ecf..ce9b40a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java @@ -512,7 +512,22 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } private void assertEpoll(String message, boolean expected, Transport transport) throws Exception { -Field group = transport.getClass().getDeclaredField("group"); +Field group = null; +Class transportType = transport.getClass(); + +while (transportType != null && group == null) { +try { +group = transportType.getDeclaredField("group"); +} catch (NoSuchFieldException error) { +transportType = transportType.getSuperclass(); +if (Object.class.equals(transportType)) { +transportType = null; +} +} +} + +assertNotNull("Transport implementation unknown", group); + group.setAccessible(true); if (expected) { assertTrue(message, group.get(transport) instanceof EpollEventLoopGroup); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
qpid-jms git commit: QPIDJMS-279 Test for NettyTcpTransport honoring useEpoll option
Repository: qpid-jms Updated Branches: refs/heads/master a2fd5f165 -> 6c2a7c144 QPIDJMS-279 Test for NettyTcpTransport honoring useEpoll option Add a test that will run on platforms that support Epoll to check that the transport honors the useEpoll TransportOption. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6c2a7c14 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6c2a7c14 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6c2a7c14 Branch: refs/heads/master Commit: 6c2a7c144df265185ce1579ee723d2f789c48386 Parents: a2fd5f1 Author: Timothy BishAuthored: Mon Apr 3 16:27:13 2017 -0400 Committer: Timothy Bish Committed: Mon Apr 3 16:27:13 2017 -0400 -- .../transports/netty/NettyTcpTransportTest.java | 59 1 file changed, 59 insertions(+) -- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c2a7c14/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java -- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java index 2f42dd8..7183ecf 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java @@ -22,8 +22,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -40,6 +42,8 @@ import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; /** * Test basic functionality of the Netty based TCP transport. @@ -462,6 +466,61 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } } +@Test(timeout = 60 * 1000) +public void testConnectToServerWithEpollEnabled() throws Exception { +doTestEpollSupport(true); +} + +@Test(timeout = 60 * 1000) +public void testConnectToServerWithEpollDisabled() throws Exception { +doTestEpollSupport(false); +} + +private void doTestEpollSupport(boolean useEpoll) throws Exception { +assumeTrue(Epoll.isAvailable()); + +try (NettyEchoServer server = createEchoServer(createServerOptions())) { +server.start(); + +int port = server.getServerPort(); +URI serverLocation = new URI("tcp://localhost:" + port); + +TransportOptions options = createClientOptions(); +options.setUseEpoll(useEpoll); + +Transport transport = createTransport(serverLocation, testListener, options); +try { +transport.connect(null); +LOG.info("Connected to server:{} as expected.", serverLocation); +} catch (Exception e) { +fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); +} + +assertTrue(transport.isConnected()); +assertEquals(serverLocation, transport.getRemoteLocation()); +assertEpoll("Transport should be using Epoll", useEpoll, transport); + +transport.close(); + +// Additional close should not fail or cause other problems. +transport.close(); +} + +assertTrue(!transportClosed); // Normal shutdown does not trigger the event. +assertTrue(exceptions.isEmpty()); +assertTrue(data.isEmpty()); +} + +private void assertEpoll(String message, boolean expected, Transport transport) throws Exception { +Field group = transport.getClass().getDeclaredField("group"); +group.setAccessible(true); +if (expected) { +assertTrue(message, group.get(transport) instanceof EpollEventLoopGroup); +} else { +assertFalse(message, group.get(transport) instanceof EpollEventLoopGroup); +} +} + protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) { if (listener == null) { return new NettyTcpTransport(serverLocation, options); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For
[1/2] qpid-jms git commit: QPIDJMS-283 Pipeline transaction discharge and declare
Repository: qpid-jms Updated Branches: refs/heads/master 74eed4cfc -> a2fd5f165 http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a2fd5f16/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java -- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 65c6a26..488675f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -1966,15 +1966,20 @@ public class TestAmqpPeer implements AutoCloseable public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState) { -remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed."); +remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.", false, null); +} + +public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean pipelinedDeclare, Binary nextTxnId) +{ +remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.", pipelinedDeclare, nextTxnId); } public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, Symbol errorType, String errorMessage) { -remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, errorType, errorMessage); +remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, errorType, errorMessage, false, null); } -public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage) { +public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage, boolean pipelinedDeclare, Binary nextTxnId) { // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, // and reply with given response and settled disposition to indicate the outcome. Discharge discharge = new Discharge(); @@ -1986,6 +1991,15 @@ public class TestAmqpPeer implements AutoCloseable expectTransfer(dischargeMatcher, nullValue(), false, false, null, false); +if (pipelinedDeclare) { +Declare declare = new Declare(); + +TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); +declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(declare)); + +expectTransfer(declareMatcher, nullValue(), false, false, new Declared().setTxnId(nextTxnId), true); +} + remotelyCloseLastCoordinatorLink(expectDetachResponse, closed, errorType, errorMessage); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a2fd5f16/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java -- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java index 35ceb6a..a9c646b 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java @@ -84,6 +84,7 @@ public class AmqpTestSupport extends QpidJmsTestSupport { "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() + "=" + getSocketBufferSize() + +"=true" + "=" + getIOBufferSize()); connector.setName("amqp"); if (isAmqpDiscovery()) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-283 Pipeline transaction discharge and declare
QPIDJMS-283 Pipeline transaction discharge and declare Pipeline the discharge of one transaction and the declare of the next one when instructed that a new transaction is requested Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a2fd5f16 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a2fd5f16 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a2fd5f16 Branch: refs/heads/master Commit: a2fd5f165e67a322aa1c94c70edeff3599753379 Parents: 74eed4c Author: Timothy BishAuthored: Mon Apr 3 14:32:14 2017 -0400 Committer: Timothy Bish Committed: Mon Apr 3 14:32:14 2017 -0400 -- .../java/org/apache/qpid/jms/JmsConnection.java | 16 +- .../qpid/jms/JmsLocalTransactionContext.java| 78 +++- .../qpid/jms/meta/JmsAbstractResourceId.java| 4 +- .../apache/qpid/jms/meta/JmsTransactionId.java | 4 + .../org/apache/qpid/jms/provider/Provider.java | 22 +- .../qpid/jms/provider/ProviderWrapper.java | 8 +- .../qpid/jms/provider/amqp/AmqpProvider.java| 8 +- .../qpid/jms/provider/amqp/AmqpSession.java | 12 +- .../provider/amqp/AmqpTransactionContext.java | 199 ++ .../amqp/AmqpTransactionCoordinator.java| 111 -- .../jms/provider/failover/FailoverProvider.java | 8 +- .../TransactionsIntegrationTest.java| 359 ++- .../qpid/jms/provider/ProviderWrapperTest.java | 12 +- .../failover/FailoverProviderClosedTest.java| 4 +- .../qpid/jms/provider/mock/MockProvider.java| 4 +- .../qpid/jms/test/testpeer/TestAmqpPeer.java| 20 +- .../qpid/jms/support/AmqpTestSupport.java | 1 + 17 files changed, 630 insertions(+), 240 deletions(-) -- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a2fd5f16/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java -- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 8294570..0fc9dcd 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -747,18 +747,18 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } } -void commit(JmsTransactionInfo transactionInfo) throws JMSException { -commit(transactionInfo, null); +void commit(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId) throws JMSException { +commit(transactionInfo, nextTransactionId, null); } -void commit(JmsTransactionInfo transactionInfo, ProviderSynchronization synchronization) throws JMSException { +void commit(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { -provider.commit(transactionInfo, request); +provider.commit(transactionInfo, nextTransactionId, request); request.sync(); } finally { requests.remove(request); @@ -768,18 +768,18 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } } -void rollback(JmsTransactionInfo transactionInfo) throws JMSException { -rollback(transactionInfo, null); +void rollback(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId) throws JMSException { +rollback(transactionInfo, nextTransactionId, null); } -void rollback(JmsTransactionInfo transactionInfo, ProviderSynchronization synchronization) throws JMSException { +void rollback(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); try { ProviderFuture request = new ProviderFuture(synchronization); requests.put(request, request); try { -provider.rollback(transactionInfo, request); +provider.rollback(transactionInfo, nextTransactionId, request); request.sync(); } finally { requests.remove(request); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a2fd5f16/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java -- diff --git
svn commit: r1789991 - in /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src: main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0
Author: lquack Date: Mon Apr 3 13:28:24 2017 New Revision: 1789991 URL: http://svn.apache.org/viewvc?rev=1789991=rev Log: QPID-7659: [Java Broker] Fix Link stealing in AMQP 1.0 Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1789991=1789990=1789991=diff == --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Mon Apr 3 13:28:24 2017 @@ -19,14 +19,19 @@ package org.apache.qpid.server.protocol.v1_0; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.ExecutionException; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.BaseSource; import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; @@ -48,10 +53,12 @@ public class LinkImpl _thiefQueue = new LinkedList<>(); private volatile LinkEndpoint_linkEndpoint; private volatile S _source; private volatile T _target; +private boolean _stealingInProgress; public LinkImpl(final String remoteContainerId, final String linkName, final Role role, final LinkRegistry linkRegistry) { @@ -68,7 +75,7 @@ public class LinkImpl> attach(final Session_1_0 session, final Attach attach) +public final synchronized ListenableFuture> attach(final Session_1_0 session, final Attach attach) { try { @@ -77,10 +84,12 @@ public class LinkImpl> future = SettableFuture.create(); +_thiefQueue.add(new ThiefInformation(session, attach, future)); +startLinkStealingIfNecessary(); +return future; } else { @@ -101,64 +110,6 @@ public class LinkImpl> stealLink(final Session_1_0 session, final Attach attach) -{ -final SettableFuture> returnFuture = SettableFuture.create(); -_linkEndpoint.getSession().doOnIOThreadAsync( -() -> -{ -_linkEndpoint.close(new Error(LinkError.STOLEN, - String.format("Link is being stolen by connection '%s'", - session.getConnection(; -try -{ -returnFuture.set(attach(session, attach).get()); -} -catch (InterruptedException e) -{ -returnFuture.setException(e); -Thread.currentThread().interrupt(); -} -catch (ExecutionException e) -{ -returnFuture.setException(e.getCause()); -} -}); -return returnFuture; -} - -private LinkEndpoint createLinkEndpoint(final Session_1_0 session, final Attach attach) -{ -final LinkEndpointlinkEndpoint; -if (_role == Role.SENDER) -{ -linkEndpoint = (LinkEndpoint) new SendingLinkEndpoint(session, (LinkImpl) this); -} -else if (attach.getTarget() instanceof Coordinator) -{ -linkEndpoint = (LinkEndpoint ) new TxnCoordinatorReceivingLinkEndpoint(session, (LinkImpl) this); -} -else -{ -linkEndpoint = (LinkEndpoint ) new StandardReceivingLinkEndpoint(session, (LinkImpl) this); -} -return linkEndpoint; -} - -private ListenableFuture> rejectLink(final Session_1_0 session, Throwable t) -{ -if (t instanceof AmqpErrorException) -{ -_linkEndpoint = new ErrantLinkEndpoint<>(this, session, ((AmqpErrorException) t).getError()); -} -else -{ -_linkEndpoint
svn commit: r1789938 - /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Author: rgodfrey Date: Mon Apr 3 09:59:09 2017 New Revision: 1789938 URL: http://svn.apache.org/viewvc?rev=1789938=rev Log: QPID-7732 : Fix resolution of global addresses to local nodes Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1789938=1789937=1789938=diff == --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Apr 3 09:59:09 2017 @@ -1265,7 +1265,7 @@ public abstract class AbstractVirtualHos MessageSource messageSource = _systemNodeSources.get(name); if(messageSource == null) { -messageSource = awaitChildClassToAttainState(Queue.class, name); +messageSource = getAttainedChildFromAddress(Queue.class, name); } if(messageSource == null) { @@ -1410,7 +1410,7 @@ public abstract class AbstractVirtualHos { if(address.startsWith(domain + "/")) { -child = awaitChildClassToAttainState(childClass, address.substring(domain.length())); +child = awaitChildClassToAttainState(childClass, address.substring(domain.length()+1)); if(child != null) { break; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org