Repository: qpid-jms Updated Branches: refs/heads/master 0c0342090 -> 74eed4cfc
QPIDJMS-282: add a check to the resource builder to fire the open handling and send resulting output if the remote open was already received Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/74eed4cf Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/74eed4cf Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/74eed4cf Branch: refs/heads/master Commit: 74eed4cfc95ab7ab5b15dda5fa70d94608c76f66 Parents: 0c03420 Author: Robert Gemmell <rob...@apache.org> Authored: Fri Mar 31 17:52:51 2017 +0100 Committer: Robert Gemmell <rob...@apache.org> Committed: Fri Mar 31 17:52:51 2017 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 19 ++++++++++ .../amqp/builders/AmqpResourceBuilder.java | 17 ++++++++- .../integration/ConnectionIntegrationTest.java | 34 +++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 40 +++++++++++++++++--- 4 files changed, 102 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index b571244..068beee 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -821,6 +821,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } + public void scheduleExecuteAndPump(Runnable task) { + serializer.execute(new Runnable() { + @Override + public void run() { + try { + try { + task.run(); + } finally { + pumpToProtonTransport(); + } + } catch (Throwable t) { + LOG.warn("Caught problem during task processing: {}", t.getMessage(), t); + + fireProviderException(t); + } + } + }); + } + /** * Callback method for the Transport to report that the underlying connection * has closed. When called this method will queue a new task that will check for http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index 69d5f04..b244cfe 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -32,6 +32,7 @@ import org.apache.qpid.jms.provider.amqp.AmqpResourceParent; import org.apache.qpid.jms.provider.amqp.AmqpSupport; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,11 +80,13 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex // Create the resource object now resource = createResource(parent, resourceInfo, endpoint); + AmqpProvider provider = parent.getProvider(); + if (getRequestTimeout() > JmsConnectionInfo.INFINITE) { // Attempt to schedule a cancellation of the pending open request, can return // null if there is no configured request timeout. - requestTimeoutTask = parent.getProvider().scheduleRequestTimeout(new AsyncResult() { + requestTimeoutTask = provider.scheduleRequestTimeout(new AsyncResult() { @Override public void onSuccess() { @@ -92,7 +95,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex @Override public void onFailure(Throwable result) { - handleClosed(parent.getProvider(), result); + handleClosed(provider, result); } @Override @@ -102,6 +105,16 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex }, getRequestTimeout(), this); } + + // Check it wasn't already opened, if it is then handle it + if (endpoint.getRemoteState() != EndpointState.UNINITIALIZED) { + provider.scheduleExecuteAndPump(new Runnable() { + @Override + public void run() { + handleOpened(provider); + } + }); + } } //----- Event handlers ---------------------------------------------------// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 76c1443..d8c713f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -732,4 +732,38 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } } } + + @Test(timeout = 20000) + public void testConnectionWithPreemptiveServerOpen() throws Exception { + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // Ensure the Connection awaits a ClientID being set or not, giving time for the preemptive server Open + String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true"; + + testPeer.expectSaslAnonymousWithServerAmqpHeaderSentPreemptively(); + testPeer.sendPreemptiveServerAmqpHeader(); + testPeer.sendPreemptiveServerOpenFrame(); + // Then expect the clients header to arrive, but defer responding since the servers was already sent. + testPeer.expectHeader(AmqpHeader.HEADER, null); + + ConnectionFactory factory = new JmsConnectionFactory(uri); + Connection connection = factory.createConnection(); + + // Then expect the clients Open frame to arrive, but defer responding since the servers was already sent + // before the clients AMQP connection open is provoked. + testPeer.expectOpen(null, null, true); + testPeer.expectBegin(); + + Thread.sleep(10); // Gives a little more time for the preemptive Open to actually arrive. + + // Use the connection to provoke the Open + connection.setClientID("client-id"); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index f6ebc7a..65c6a26 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -363,6 +363,17 @@ public class TestAmqpPeer implements AutoCloseable _driverRunnable.sendBytes(header); } + public void sendPreemptiveServerAmqpHeader() { + // Arrange to send the AMQP header after the previous handler + CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); + comp.add(new AmqpPeerRunnable() { + @Override + public void run() { + sendHeader(AmqpHeader.HEADER); + } + }); + } + public void sendEmptyFrame(boolean deferWrite) { sendFrame(FrameType.AMQP, 0, null, null, deferWrite, 0); @@ -438,7 +449,8 @@ public class TestAmqpPeer implements AutoCloseable addHandler(new HeaderHandlerImpl(header, response)); } - private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, boolean sendSaslHeaderResponse) + private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, + boolean sendSaslHeaderResponse, boolean amqpHeaderSentPreemptively) { SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism); byte[] saslHeaderResponse = null; @@ -477,7 +489,10 @@ public class TestAmqpPeer implements AutoCloseable addHandler(saslInitMatcher); - addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); + if (!amqpHeaderSentPreemptively) + { + addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); + } } public void expectSaslPlain(String username, String password) @@ -490,7 +505,7 @@ public class TestAmqpPeer implements AutoCloseable Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data)); - expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true); + expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true, false); } public void expectSaslExternal() @@ -500,7 +515,7 @@ public class TestAmqpPeer implements AutoCloseable throw new IllegalStateException("need-client-cert must be enabled on the test peer"); } - expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true); + expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true, false); } public void expectSaslAnonymous() @@ -510,14 +525,19 @@ public class TestAmqpPeer implements AutoCloseable public void expectSaslAnonymous(Matcher<?> hostnameMatcher) { - expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true); + expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true, false); } public void expectSaslAnonymousWithPreEmptiveServerHeader() { assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true)); boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively - expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse); + expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse, false); + } + + public void expectSaslAnonymousWithServerAmqpHeaderSentPreemptively() + { + expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, true, true); } public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech) @@ -598,6 +618,14 @@ public class TestAmqpPeer implements AutoCloseable expectOpen(desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false); } + public void sendPreemptiveServerOpenFrame() { + // Arrange to send the Open frame after the previous handler + OpenFrame open = createOpenFrame(); + + CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); + comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null)); + } + public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities, Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org