qpid-jms git commit: QPIDJMS-434 Safe iteration over consumer that can close during
Repository: qpid-jms Updated Branches: refs/heads/master a9f0ad020 -> 2186300e8 QPIDJMS-434 Safe iteration over consumer that can close during Ensure that we safely iterate over the consumers when processing a client acknowledge to ensure that deferred closures don't cause a ConcurrentModificationException which will error out the acknowledge operation. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2186300e Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2186300e Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2186300e Branch: refs/heads/master Commit: 2186300e879e1dc235e67d7b197c55b46618dc2b Parents: a9f0ad0 Author: Timothy Bish Authored: Tue Nov 20 17:06:05 2018 -0500 Committer: Timothy Bish Committed: Tue Nov 20 17:06:05 2018 -0500 -- .../qpid/jms/provider/amqp/AmqpSession.java | 5 +- .../integration/ConsumerIntegrationTest.java| 80 +++- 2 files changed, 83 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2186300e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java -- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 83f12f7..c71a485 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -71,7 +71,10 @@ public class AmqpSession extends AmqpAbstractResource i * controls the acknowledgement that is applied to each message. */ public void acknowledge(final ACK_TYPE ackType) { -for (AmqpConsumer consumer : consumers.values()) { +// A consumer whose close was deferred will be closed and removed from the consumers +// map so we must copy the entries to safely traverse the collection during this operation. +List consumers = new ArrayList<>(this.consumers.values()); +for (AmqpConsumer consumer : consumers) { consumer.acknowledge(ackType); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2186300e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java -- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index fbc0e41..7be79eb 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -1590,7 +1590,85 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { } @Test(timeout=2) -public void testConsumerWithDeferredCloseActsAsClosed() throws Exception { +public void testCloseConsumersWithDeferredAckHandledLaterWhenlastConsumedMessageIsAcked() throws Exception { +try (TestAmqpPeer testPeer = new TestAmqpPeer();) { +final int DEFAULT_PREFETCH = 10; + +// Set to fixed known value to reduce breakage if defaults are changed. +Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH); +connection.start(); + +testPeer.expectBegin(); + +Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); +Queue queue = session.createQueue(getTestName()); + +testPeer.expectReceiverAttach(); +testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-1"), +1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 1, true); + +testPeer.expectReceiverAttach(); +testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-2"), +1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 2, true); + +final CountDownLatch expected = new CountDownLatch(2); +((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() { +@Override +public void onInboundMessage(JmsInboundMessageDispatch envelope) { +expected.countDown(); +} +}); + +// These are our two consumers, the first gets a message and abandons it,
qpid-proton-j git commit: PROTON-1965: also apply optimisation when the compared span is within the current array
Repository: qpid-proton-j Updated Branches: refs/heads/master 243dd0dd7 -> b3a66b741 PROTON-1965: also apply optimisation when the compared span is within the current array Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/b3a66b74 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/b3a66b74 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/b3a66b74 Branch: refs/heads/master Commit: b3a66b741c34b40b059fc023b3c7650720dfc001 Parents: 243dd0d Author: Robbie Gemmell Authored: Tue Nov 20 16:45:42 2018 + Committer: Robbie Gemmell Committed: Tue Nov 20 16:45:42 2018 + -- .../proton/codec/CompositeReadableBuffer.java | 10 +++- .../codec/CompositeReadableBufferTest.java | 62 .../codec/CompositeReadableBufferBenchmark.java | 11 3 files changed, 82 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/b3a66b74/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java -- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java index 409b192..5310faf 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java @@ -103,6 +103,7 @@ public class CompositeReadableBuffer implements ReadableBuffer { return currentArray != null && (contents == null || contents.size() == 1); } +@Override public int capacity() { return capacity; } @@ -840,7 +841,14 @@ public class CompositeReadableBuffer implements ReadableBuffer { return false; } -if (hasArray()) { +if (remaining == 0) { +// No content to compare, and we already checked 'remaining' is equal. Protects from NPE below. +return true; +} + +if (hasArray() || remaining <= currentArray.length - currentOffset) { +// Either there is only one array, or the span to compare is within a single chunk of this buffer, +// allowing the compare to directly access the underlying array instead of using slower get methods. return equals(currentArray, currentOffset, remaining, buffer); } else { return equals(this, buffer); http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/b3a66b74/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java -- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java index 5325f17..84c63f7 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java @@ -3696,6 +3696,68 @@ public class CompositeReadableBufferTest { assertEquals(0, buffer2.position()); } +@Test +public void testEqualsWhenContentRemainingIsSubsetOfSingleChunkInMultiArrayBufferSame() { +CompositeReadableBuffer buffer1 = new CompositeReadableBuffer(); +CompositeReadableBuffer buffer2 = new CompositeReadableBuffer(); + +byte[] data1 = new byte[] {-1, -1, 0, 1, 2, 3, 4, 5}; +byte[] data2 = new byte[] {-1, -1, -1, 0, 1, 2, 3, 4, 5}; + +buffer1.append(data1); +buffer1.position(2); + +buffer2.append(data2); +buffer2.position(3); + +byte[] data3 = new byte[] { 5, 4, 3, 2, 1 }; +buffer1.append(data3); +buffer2.append(data3); + +buffer1.limit(data1.length); +buffer2.limit(data2.length); + +assertEquals(6, buffer1.remaining()); +assertEquals(6, buffer2.remaining()); + +assertEquals(buffer1, buffer2); +assertEquals(buffer2, buffer1); + +assertEquals(2, buffer1.position()); +assertEquals(3, buffer2.position()); +} + +@Test +public void testEqualsWhenContentRemainingIsSubsetOfSingleChunkInMultiArrayBufferNotSame() { +CompositeReadableBuffer buffer1 = new CompositeReadableBuffer(); +CompositeReadableBuffer buffer2 = new CompositeReadableBuffer(); + +byte[] data1 = new byte[] {-1, -1, 0, 1, 2, 3, 4, 5}; +byte[] data2 = new byte[] {-1, -1, -1, 0, 1, 2, 3, 4, -1}; + +buffer1.append(data1); +buffer1.position(2); + +buffer2.append(data2); +buffer2.position(3); + +
qpid-proton git commit: PROTON-1887: [c] fix signed/unsigned warning
Repository: qpid-proton Updated Branches: refs/heads/master 0bdba37da -> d1d55df20 PROTON-1887: [c] fix signed/unsigned warning Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d1d55df2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d1d55df2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d1d55df2 Branch: refs/heads/master Commit: d1d55df2044941121c582bc7ac38d06b594cef52 Parents: 0bdba37 Author: Alan Conway Authored: Tue Nov 20 09:44:18 2018 -0500 Committer: Alan Conway Committed: Tue Nov 20 09:44:18 2018 -0500 -- c/tests/connection_driver_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1d55df2/c/tests/connection_driver_test.cpp -- diff --git a/c/tests/connection_driver_test.cpp b/c/tests/connection_driver_test.cpp index 175a77d..62b60ff 100644 --- a/c/tests/connection_driver_test.cpp +++ b/c/tests/connection_driver_test.cpp @@ -310,7 +310,7 @@ void send_receive_message(const std::string , pn_test::driver_pair ) { pn_delivery_t *sd = pn_delivery(l, pn_dtag(tag.data(), tag.size())); d.server.handler.delivery = NULL; CHECK(pn_delivery_current(sd)); -CHECK(tag.size() == pn_link_send(l, tag.data(), tag.size())); +CHECK(ssize_t(tag.size()) == pn_link_send(l, tag.data(), tag.size())); pn_delivery_settle(sd); d.run(); pn_delivery_t *rd = d.server.handler.delivery; @@ -318,7 +318,7 @@ void send_receive_message(const std::string , pn_test::driver_pair ) { CHECKED_IF(rd) { CHECK(pn_delivery_current(rd)); std::string rbuf(tag.size() * 2, 'x'); - CHECK(tag.size() == + CHECK(ssize_t(tag.size()) == pn_link_recv(pn_delivery_link(rd), [0], rbuf.size())); rbuf.resize(tag.size()); CHECK(tag == rbuf); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
qpid-dispatch git commit: DISPATCH-1168 Fix popup info being stale after adding new edge router or client
Repository: qpid-dispatch Updated Branches: refs/heads/master 9b44162b8 -> 386587e10 DISPATCH-1168 Fix popup info being stale after adding new edge router or client Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/386587e1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/386587e1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/386587e1 Branch: refs/heads/master Commit: 386587e10bfa74c48a57f91c0ef1387d09f66ffc Parents: 9b44162 Author: Ernest Allen Authored: Tue Nov 20 09:34:16 2018 -0500 Committer: Ernest Allen Committed: Tue Nov 20 09:34:16 2018 -0500 -- console/stand-alone/plugin/js/topology/nodes.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/386587e1/console/stand-alone/plugin/js/topology/nodes.js -- diff --git a/console/stand-alone/plugin/js/topology/nodes.js b/console/stand-alone/plugin/js/topology/nodes.js index 411f942..66afd88 100644 --- a/console/stand-alone/plugin/js/topology/nodes.js +++ b/console/stand-alone/plugin/js/topology/nodes.js @@ -135,7 +135,9 @@ export class Node { return nodeProperties[this.nodeType].radius; } uid(srv) { -return this.uuid ? this.uuid : srv.utilities.nameFromId(this.key); +if (!this.uuid) + this.uuid = srv.utilities.nameFromId(this.key); +return this.normals ? `${this.uuid}-${this.normals.length}` : this.uuid; } } const nodeProperties = { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
qpid-proton-j git commit: PROTON-1965: Optimize CompositeReadableBuffer::equals for the single-array case.
Repository: qpid-proton-j Updated Branches: refs/heads/master 459b84d44 -> 243dd0dd7 PROTON-1965: Optimize CompositeReadableBuffer::equals for the single-array case. PR from Francesco Nigro with fixup from Robbie. This closes #20. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/243dd0dd Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/243dd0dd Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/243dd0dd Branch: refs/heads/master Commit: 243dd0dd77f228300957ccec74c8fff27e9db3cd Parents: 459b84d Author: Francesco Nigro Authored: Tue Nov 20 11:58:31 2018 + Committer: Robbie Gemmell Committed: Tue Nov 20 11:58:31 2018 + -- .../proton/codec/CompositeReadableBuffer.java | 32 +-- .../codec/CompositeReadableBufferBenchmark.java | 88 2 files changed, 113 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/243dd0dd/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java -- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java index 1441dcb..409b192 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java @@ -834,22 +834,40 @@ public class CompositeReadableBuffer implements ReadableBuffer { return false; } -ReadableBuffer buffer = (ReadableBuffer)other; -if (this.remaining() != buffer.remaining()) { +ReadableBuffer buffer = (ReadableBuffer) other; +final int remaining = remaining(); +if (remaining != buffer.remaining()) { return false; } -final int origPos = position(); +if (hasArray()) { +return equals(currentArray, currentOffset, remaining, buffer); +} else { +return equals(this, buffer); +} +} + +private static boolean equals(byte[] buffer, int start, int length, ReadableBuffer other) { +final int position = other.position(); +for (int i = 0; i < length; i++) { +if (buffer[start + i] != other.get(position + i)) { +return false; +} +} +return true; +} + +private static boolean equals(ReadableBuffer buffer, ReadableBuffer other) { +final int origPos = buffer.position(); try { -for (int i = buffer.position(); hasRemaining(); i++) { -if (!equals(this.get(), buffer.get(i))) { +for (int i = other.position(); buffer.hasRemaining(); i++) { +if (!equals(buffer.get(), other.get(i))) { return false; } } - return true; } finally { -position(origPos); +buffer.position(origPos); } } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/243dd0dd/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferBenchmark.java -- diff --git a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferBenchmark.java new file mode 100644 index 000..4d58a68 --- /dev/null +++ b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferBenchmark.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.codec; + +import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.profile.GCProfiler; +import