(activemq) branch activemq-5.18.x updated: AMQ-9504 - Add missing license header
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new a12f03009 AMQ-9504 - Add missing license header a12f03009 is described below commit a12f030090bdf593b4c7bbaf10cd2d553f14bf89 Author: Christopher L. Shannon AuthorDate: Wed May 22 10:11:24 2024 -0400 AMQ-9504 - Add missing license header (cherry picked from commit 527d245831fb95fa3a25180ecf404d5a316f2425) --- .../bugs/MultiKahaDBMultipleFilteredAdapterTest.java | 16 1 file changed, 16 insertions(+) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java index 1d0111d6c..ae6973945 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals;
(activemq) branch activemq-6.1.x updated: AMQ-9504 - Add missing license header
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-6.1.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-6.1.x by this push: new ba3d395fc AMQ-9504 - Add missing license header ba3d395fc is described below commit ba3d395fc3f077f41381ad801f2a898caebb1eaa Author: Christopher L. Shannon AuthorDate: Wed May 22 10:11:24 2024 -0400 AMQ-9504 - Add missing license header (cherry picked from commit 527d245831fb95fa3a25180ecf404d5a316f2425) --- .../bugs/MultiKahaDBMultipleFilteredAdapterTest.java | 16 1 file changed, 16 insertions(+) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java index 0ff9e9f2d..b292ba39f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals;
(activemq) branch main updated: AMQ-9504 - Add missing license header
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 527d24583 AMQ-9504 - Add missing license header 527d24583 is described below commit 527d245831fb95fa3a25180ecf404d5a316f2425 Author: Christopher L. Shannon AuthorDate: Wed May 22 10:11:24 2024 -0400 AMQ-9504 - Add missing license header --- .../bugs/MultiKahaDBMultipleFilteredAdapterTest.java | 16 1 file changed, 16 insertions(+) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java index 0ff9e9f2d..b292ba39f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals;
(activemq) branch activemq-5.18.x updated: AMQ-9504 - Prevent registering duplicate mKahadb adapters
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new c71965176 AMQ-9504 - Prevent registering duplicate mKahadb adapters c71965176 is described below commit c71965176ac4acec78779fbc626c98fc310603e1 Author: Christopher L. Shannon AuthorDate: Wed May 22 09:22:01 2024 -0400 AMQ-9504 - Prevent registering duplicate mKahadb adapters This fixes an issue on start up of a broker that is configured with multiple mKahaDB filtered adapters and one is configured with perDestination=true. Before this fix a duplicate persistence adapter could be created because the filter did not check for existing matches. Patch applied with thanks to Ritesh Adval (cherry picked from commit ddfb36515c0e9588d2e322365f56a3f53fb094ad) --- .../kahadb/MultiKahaDBPersistenceAdapter.java | 19 +-- .../MultiKahaDBMultipleFilteredAdapterTest.java| 157 + 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index d852c1525..3f33d48f6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import javax.transaction.xa.Xid; import org.apache.activemq.broker.BrokerService; @@ -88,7 +89,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem }; final DelegateDestinationMap destinationMap = new DelegateDestinationMap(); -List adapters = new CopyOnWriteArrayList(); +List adapters = new CopyOnWriteArrayList<>(); private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB"); MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this); @@ -383,16 +384,18 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException { -FileFilter destinationNames = new FileFilter() { -@Override -public boolean accept(File file) { -return file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); -} -}; +FileFilter destinationNames = file -> +file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); + File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames); if (candidates != null) { +Set existing = adapters.stream().map(PersistenceAdapter::getDirectory).collect( +Collectors.toSet()); for (File candidate : candidates) { -registerExistingAdapter(template, candidate); +if(!existing.contains(candidate)) { +LOG.debug("Adapter does not exist for dir: {} so will register it", candidate); +registerExistingAdapter(template, candidate); +} } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java new file mode 100644 index 0..1d0111d6c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java @@ -0,0 +1,157 @@ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; +import java.util.stream.Collectors; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +
(activemq) branch activemq-6.1.x updated: AMQ-9504 - Prevent registering duplicate mKahadb adapters
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-6.1.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-6.1.x by this push: new d719df230 AMQ-9504 - Prevent registering duplicate mKahadb adapters d719df230 is described below commit d719df23007393660547b58c9ff17c3d0bd72f8a Author: Christopher L. Shannon AuthorDate: Wed May 22 09:22:01 2024 -0400 AMQ-9504 - Prevent registering duplicate mKahadb adapters This fixes an issue on start up of a broker that is configured with multiple mKahaDB filtered adapters and one is configured with perDestination=true. Before this fix a duplicate persistence adapter could be created because the filter did not check for existing matches. Patch applied with thanks to Ritesh Adval (cherry picked from commit ddfb36515c0e9588d2e322365f56a3f53fb094ad) --- .../kahadb/MultiKahaDBPersistenceAdapter.java | 19 +-- .../MultiKahaDBMultipleFilteredAdapterTest.java| 157 + 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index d852c1525..3f33d48f6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import javax.transaction.xa.Xid; import org.apache.activemq.broker.BrokerService; @@ -88,7 +89,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem }; final DelegateDestinationMap destinationMap = new DelegateDestinationMap(); -List adapters = new CopyOnWriteArrayList(); +List adapters = new CopyOnWriteArrayList<>(); private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB"); MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this); @@ -383,16 +384,18 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException { -FileFilter destinationNames = new FileFilter() { -@Override -public boolean accept(File file) { -return file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); -} -}; +FileFilter destinationNames = file -> +file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); + File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames); if (candidates != null) { +Set existing = adapters.stream().map(PersistenceAdapter::getDirectory).collect( +Collectors.toSet()); for (File candidate : candidates) { -registerExistingAdapter(template, candidate); +if(!existing.contains(candidate)) { +LOG.debug("Adapter does not exist for dir: {} so will register it", candidate); +registerExistingAdapter(template, candidate); +} } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java new file mode 100644 index 0..0ff9e9f2d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java @@ -0,0 +1,157 @@ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +impo
(activemq) branch main updated: AMQ-9504 - Prevent registering duplicate mKahadb adapters
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new ddfb36515 AMQ-9504 - Prevent registering duplicate mKahadb adapters ddfb36515 is described below commit ddfb36515c0e9588d2e322365f56a3f53fb094ad Author: Christopher L. Shannon AuthorDate: Wed May 22 09:22:01 2024 -0400 AMQ-9504 - Prevent registering duplicate mKahadb adapters This fixes an issue on start up of a broker that is configured with multiple mKahaDB filtered adapters and one is configured with perDestination=true. Before this fix a duplicate persistence adapter could be created because the filter did not check for existing matches. Patch applied with thanks to Ritesh Adval --- .../kahadb/MultiKahaDBPersistenceAdapter.java | 19 +-- .../MultiKahaDBMultipleFilteredAdapterTest.java| 157 + 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index d852c1525..3f33d48f6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import javax.transaction.xa.Xid; import org.apache.activemq.broker.BrokerService; @@ -88,7 +89,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem }; final DelegateDestinationMap destinationMap = new DelegateDestinationMap(); -List adapters = new CopyOnWriteArrayList(); +List adapters = new CopyOnWriteArrayList<>(); private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB"); MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this); @@ -383,16 +384,18 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException { -FileFilter destinationNames = new FileFilter() { -@Override -public boolean accept(File file) { -return file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); -} -}; +FileFilter destinationNames = file -> +file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); + File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames); if (candidates != null) { +Set existing = adapters.stream().map(PersistenceAdapter::getDirectory).collect( +Collectors.toSet()); for (File candidate : candidates) { -registerExistingAdapter(template, candidate); +if(!existing.contains(candidate)) { +LOG.debug("Adapter does not exist for dir: {} so will register it", candidate); +registerExistingAdapter(template, candidate); +} } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java new file mode 100644 index 0..0ff9e9f2d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java @@ -0,0 +1,157 @@ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.junit.Aft
(activemq) branch activemq-5.18.x updated: AMQ-9481 - Correctly complete async servlet request on timeout
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 827ad1012 AMQ-9481 - Correctly complete async servlet request on timeout 827ad1012 is described below commit 827ad1012b934ca4fff749a5da1a9c1e070a0ee1 Author: Christopher L. Shannon AuthorDate: Sun Apr 21 12:42:20 2024 -0400 AMQ-9481 - Correctly complete async servlet request on timeout This fixes AsyncServletRequest to correctly call context.dispatch() when the async request times out so that the consumer can be re-used. (cherry picked from commit 72befc14fbb69c24bdec0c7d4a1002da8874380d) --- .../src/test/java/org/apache/activemq/web/RestTest.java | 17 + .../apache/activemq/web/async/AsyncServletRequest.java | 8 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 2bbea2270..aa5cbdb63 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -86,11 +86,20 @@ public class RestTest extends JettyTestSupport { HttpClient httpClient = new HttpClient(); httpClient.start(); -final StringBuffer buf = new StringBuffer(); -final Future result = -asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); +// AMQ-9330 - test no 500 error on timeout and instead 204 error +Future result = +asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=2000=queue=test", new StringBuffer()); +// try a second request while the first is running, this should get a 500 error since the first is still running and +// concurrent access to the same consumer is not allowed +Future errorResult = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1=queue=test", new StringBuffer()); +assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, errorResult.get().getResponse().getStatus()); +//After the original request finishes, verify 204 and not 500 error +assertEquals(HttpStatus.NO_CONTENT_204, result.get().getResponse().getStatus()); -//Test timeout, no message was sent +// AMQ-9481 - test to make sure we can re-use the consumer after timeout by trying again and ensuring +// no 500 error. Before the fix in AMQ-9418 this would fail even after the previous request timed out +result = +asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue=test", new StringBuffer()); assertEquals(HttpStatus.NO_CONTENT_204, result.get().getResponse().getStatus()); } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java index 2598332e4..bd9f462ce 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java @@ -115,10 +115,10 @@ public class AsyncServletRequest implements AsyncListener { } final AsyncContext context = event.getAsyncContext(); -if (context != null) { -// We must call complete and then set the status code to prevent a 500 -// error. The spec requires a 500 error on timeout unless complete() is called. -context.complete(); +if (context != null && event.getSuppliedRequest().isAsyncStarted()) { +// We must call dispatch to finish the request on timeout. +// then set the status code to prevent a 500 error. +context.dispatch(); final ServletResponse response = context.getResponse(); if (response instanceof HttpServletResponse) { ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_NO_CONTENT);
(activemq) 01/01: Merge pull request #1206 from apache/AMQ-9841
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git commit 6e6caf7c6060efadc1ba524147e71d9720fcd935 Merge: 6084867b2 72befc14f Author: Christopher L. Shannon AuthorDate: Mon Apr 22 11:01:35 2024 -0400 Merge pull request #1206 from apache/AMQ-9841 AMQ-9481 - Correctly complete async servlet request on timeout .../src/test/java/org/apache/activemq/web/RestTest.java | 17 + .../apache/activemq/web/async/AsyncServletRequest.java | 8 2 files changed, 17 insertions(+), 8 deletions(-)
(activemq) branch main updated (6084867b2 -> 6e6caf7c6)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 6084867b2 [maven-release-plugin] prepare for next development iteration add 72befc14f AMQ-9481 - Correctly complete async servlet request on timeout new 6e6caf7c6 Merge pull request #1206 from apache/AMQ-9841 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../src/test/java/org/apache/activemq/web/RestTest.java | 17 + .../apache/activemq/web/async/AsyncServletRequest.java | 8 2 files changed, 17 insertions(+), 8 deletions(-)
(activemq) branch AMQ-9841 deleted (was 72befc14f)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch AMQ-9841 in repository https://gitbox.apache.org/repos/asf/activemq.git was 72befc14f AMQ-9481 - Correctly complete async servlet request on timeout The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(activemq) branch AMQ-9841 created (now 72befc14f)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch AMQ-9841 in repository https://gitbox.apache.org/repos/asf/activemq.git at 72befc14f AMQ-9481 - Correctly complete async servlet request on timeout This branch includes the following new commits: new 72befc14f AMQ-9481 - Correctly complete async servlet request on timeout The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(activemq) 01/01: AMQ-9481 - Correctly complete async servlet request on timeout
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch AMQ-9841 in repository https://gitbox.apache.org/repos/asf/activemq.git commit 72befc14fbb69c24bdec0c7d4a1002da8874380d Author: Christopher L. Shannon AuthorDate: Sun Apr 21 12:42:20 2024 -0400 AMQ-9481 - Correctly complete async servlet request on timeout This fixes AsyncServletRequest to correctly call context.dispatch() when the async request times out so that the consumer can be re-used. --- .../src/test/java/org/apache/activemq/web/RestTest.java | 17 + .../apache/activemq/web/async/AsyncServletRequest.java | 8 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 24f95d0f5..23c096292 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -86,11 +86,20 @@ public class RestTest extends JettyTestSupport { HttpClient httpClient = new HttpClient(); httpClient.start(); -final StringBuffer buf = new StringBuffer(); -final Future result = -asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); +// AMQ-9330 - test no 500 error on timeout and instead 204 error +Future result = +asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=2000=queue=test", new StringBuffer()); +// try a second request while the first is running, this should get a 500 error since the first is still running and +// concurrent access to the same consumer is not allowed +Future errorResult = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1=queue=test", new StringBuffer()); +assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, errorResult.get().getResponse().getStatus()); +//After the original request finishes, verify 204 and not 500 error +assertEquals(HttpStatus.NO_CONTENT_204, result.get().getResponse().getStatus()); -//Test timeout, no message was sent +// AMQ-9481 - test to make sure we can re-use the consumer after timeout by trying again and ensuring +// no 500 error. Before the fix in AMQ-9418 this would fail even after the previous request timed out +result = +asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue=test", new StringBuffer()); assertEquals(HttpStatus.NO_CONTENT_204, result.get().getResponse().getStatus()); } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java index 9ff5d3c04..4ba158b45 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java @@ -115,10 +115,10 @@ public class AsyncServletRequest implements AsyncListener { } final AsyncContext context = event.getAsyncContext(); -if (context != null) { -// We must call complete and then set the status code to prevent a 500 -// error. The spec requires a 500 error on timeout unless complete() is called. -context.complete(); +if (context != null && event.getSuppliedRequest().isAsyncStarted()) { +// We must call dispatch to finish the request on timeout. +// then set the status code to prevent a 500 error. +context.dispatch(); final ServletResponse response = context.getResponse(); if (response instanceof HttpServletResponse) { ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_NO_CONTENT);
(activemq) 01/01: Merge pull request #1204 from mattrpav/AMQ-9430
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git commit f6d067a49aa0f7aea18afa7738d60f9d15eeafa9 Merge: 1c3c1289b 1e314d8df Author: Christopher L. Shannon AuthorDate: Thu Apr 11 12:29:19 2024 -0400 Merge pull request #1204 from mattrpav/AMQ-9430 [AMQ-9430] RuntimeConfigPlugin wire in classloader to JAXBContext .../java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(activemq) branch main updated (1c3c1289b -> f6d067a49)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 1c3c1289b Merge pull request #1202 from jbonofre/AMQ-9473 add 1e314d8df [AMQ-9430] RuntimeConfigPlugin wire in classloader to JAXBContext new f6d067a49 Merge pull request #1204 from mattrpav/AMQ-9430 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(activemq) branch activemq-5.18.x updated: AMQ-9330 - Return 204 code when polling empty destinations (#1203)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new dc892d3df AMQ-9330 - Return 204 code when polling empty destinations (#1203) dc892d3df is described below commit dc892d3dfc2f4b7d0402d947f40431ce4e50d92d Author: Christopher L. Shannon AuthorDate: Thu Apr 11 09:59:46 2024 -0400 AMQ-9330 - Return 204 code when polling empty destinations (#1203) Previously a timeout was thrown and complete was not called so Jetty was returning a 500 error (cherry picked from commit 8b6072d03e273b02288e71cac6eef0539c48ea02) (cherry picked from commit 45a1bd54d3e1202d775388cf5b7d63e4c96183a5) --- .../java/org/apache/activemq/web/RestTest.java | 40 +++--- .../activemq/web/async/AsyncServletRequest.java| 13 +++ 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 9319601e9..2bbea2270 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.TextMessage; @@ -52,10 +54,10 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); -latch.await(); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); assertEquals("test", buf.toString()); } @@ -66,7 +68,7 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=5000=queue", buf); //Sleep 2 seconds before sending, should still get the response as timeout is 5 seconds @@ -74,7 +76,7 @@ public class RestTest extends JettyTestSupport { producer.send(session.createTextMessage("test")); LOG.info("message sent"); -latch.await(); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); assertEquals("test", buf.toString()); } @@ -85,12 +87,11 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); //Test timeout, no message was sent -latch.await(); -assertTrue(buf.toString().contains("AsyncContext timeout")); +assertEquals(HttpStatus.NO_CONTENT_204, result.get().getResponse().getStatus()); } @Test(timeout = 60 * 1000) @@ -101,13 +102,13 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=5000=queue", buf); producer.send(session.createTextMessage("test")); LOG.info("message sent"); -latch.await(); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); assertEquals("test", buf.toString()); } @@ -163,12 +164,11 @@ public class RestTest extends JettyTestSupport { producer.send(message); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue=test", buf); -latch.await(); -LOG.info("Received: " + buf.toString()); - // assertEquals(200, contentExchange.getResponseStatus()); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); +
(activemq) branch main updated: NO-JIRA: fix formatting
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 45a1bd54d NO-JIRA: fix formatting 45a1bd54d is described below commit 45a1bd54d3e1202d775388cf5b7d63e4c96183a5 Author: Christopher L. Shannon AuthorDate: Thu Apr 11 10:02:27 2024 -0400 NO-JIRA: fix formatting --- .../main/java/org/apache/activemq/web/async/AsyncServletRequest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java index 2e169330b..9ff5d3c04 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java @@ -120,7 +120,7 @@ public class AsyncServletRequest implements AsyncListener { // error. The spec requires a 500 error on timeout unless complete() is called. context.complete(); final ServletResponse response = context.getResponse(); -if (response instanceof HttpServletResponse) { +if (response instanceof HttpServletResponse) { ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_NO_CONTENT); } }
(activemq) branch main updated: AMQ-9330 - Return 204 code when polling empty destinations (#1203)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 8b6072d03 AMQ-9330 - Return 204 code when polling empty destinations (#1203) 8b6072d03 is described below commit 8b6072d03e273b02288e71cac6eef0539c48ea02 Author: Christopher L. Shannon AuthorDate: Thu Apr 11 09:59:46 2024 -0400 AMQ-9330 - Return 204 code when polling empty destinations (#1203) Previously a timeout was thrown and complete was not called so Jetty was returning a 500 error --- .../java/org/apache/activemq/web/RestTest.java | 40 +++--- .../activemq/web/async/AsyncServletRequest.java| 13 +++ 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 9cfd9cc7a..24f95d0f5 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import jakarta.jms.TextMessage; @@ -52,10 +54,10 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); -latch.await(); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); assertEquals("test", buf.toString()); } @@ -66,7 +68,7 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=5000=queue", buf); //Sleep 2 seconds before sending, should still get the response as timeout is 5 seconds @@ -74,7 +76,7 @@ public class RestTest extends JettyTestSupport { producer.send(session.createTextMessage("test")); LOG.info("message sent"); -latch.await(); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); assertEquals("test", buf.toString()); } @@ -85,12 +87,11 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); //Test timeout, no message was sent -latch.await(); -assertTrue(buf.toString().contains("AsyncContext timeout")); +assertEquals(HttpStatus.NO_CONTENT_204, result.get().getResponse().getStatus()); } @Test(timeout = 60 * 1000) @@ -101,13 +102,13 @@ public class RestTest extends JettyTestSupport { httpClient.start(); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=5000=queue", buf); producer.send(session.createTextMessage("test")); LOG.info("message sent"); -latch.await(); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); assertEquals("test", buf.toString()); } @@ -163,12 +164,11 @@ public class RestTest extends JettyTestSupport { producer.send(message); final StringBuffer buf = new StringBuffer(); -final CountDownLatch latch = +final Future result = asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue=test", buf); -latch.await(); -LOG.info("Received: " + buf.toString()); - // assertEquals(200, contentExchange.getResponseStatus()); +assertEquals(HttpStatus.OK_200, result.get().getResponse().getStatus()); +LOG.info("Received: " + buf); assertEquals(correlId, buf.toString()); } httpClient.stop(); @@ -183,11 +183,11 @@ public clas
(activemq) branch activemq-5.18.x updated: AMQ-9475 - ConsumerControl commands should not auto create wildcard dests
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new a37c111a7 AMQ-9475 - ConsumerControl commands should not auto create wildcard dests a37c111a7 is described below commit a37c111a7522ab0307fc2bba877ce820538f5c3c Author: Christopher L. Shannon AuthorDate: Wed Apr 10 11:53:12 2024 -0400 AMQ-9475 - ConsumerControl commands should not auto create wildcard dests This fixes an issue where wildcard destinations could be inadvertently created by ConsumerControl commands which could lead to problems. The processing logic for this command now will only look up existing destinations if they exist. --- .../activemq/broker/region/AbstractRegion.java | 26 +++- .../java/org/apache/activemq/bugs/AMQ9475Test.java | 131 + 2 files changed, 152 insertions(+), 5 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 702b22b15..e62bfed81 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -543,7 +545,11 @@ public abstract class AbstractRegion implements Region { return sub.pullMessage(context, pull); } -protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { +protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary) throws Exception { +return lookup(context, destination, createTemporary, true); +} + +protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary, boolean autoCreate) throws Exception { Destination dest = null; destinationsLock.readLock().lock(); @@ -553,7 +559,7 @@ public abstract class AbstractRegion implements Region { destinationsLock.readLock().unlock(); } -if (dest == null) { +if (autoCreate && dest == null) { if (isAutoCreateDestinations()) { // Try to auto create the destination... re-invoke broker // from the @@ -679,8 +685,8 @@ public abstract class AbstractRegion implements Region { @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { -Subscription sub = subscriptions.get(control.getConsumerId()); -if (sub != null && sub instanceof AbstractSubscription) { +final Subscription sub = subscriptions.get(control.getConsumerId()); +if (sub instanceof AbstractSubscription) { ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination()); @@ -691,7 +697,17 @@ public abstract class AbstractRegion implements Region { LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()); try { -lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); +final ActiveMQDestination controlDest = Objects.requireNonNull(control.getDestination(), +"Destination must not be null in ConsumerControl"); +// Don't auto create patterns (wildcard topics) or composite, this matches addConsumer() +final boolean autoCreate = !controlDest.isPattern() && !controlDest.isComposite(); + +// If autoCreate is false then lookup() will just return null if the destination +// does not exist and we can skip the call to wakeup. This will prevent creating +// wildcard destinations for wildcard consumers but will use them if they exist + Optional.ofNullable(lookup(consumerExchange.getConnectionContext(), +control.getDestination(),false, autoCreate)) +.ifPresent(Destination::wakeup); } catch (Exception e) { LOG.warn("failed
(activemq) 01/01: Merge pull request #1200 from cshannon/AMQ-9475
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git commit 78d95552338db1be2b95fc5c2fdd4eae8d045458 Merge: f90c10df8 c8f0419ff Author: Christopher L. Shannon AuthorDate: Thu Apr 11 06:55:34 2024 -0400 Merge pull request #1200 from cshannon/AMQ-9475 AMQ-9475 - ConsumerControl commands should not auto create wildcard dests .../activemq/broker/region/AbstractRegion.java | 26 +++- .../java/org/apache/activemq/bugs/AMQ9475Test.java | 132 + 2 files changed, 153 insertions(+), 5 deletions(-)
(activemq) branch main updated (f90c10df8 -> 78d955523)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from f90c10df8 Merge pull request #1191 from kartg/remove-assembly-jrms add c8f0419ff AMQ-9475 - ConsumerControl commands should not auto create wildcard dests new 78d955523 Merge pull request #1200 from cshannon/AMQ-9475 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../activemq/broker/region/AbstractRegion.java | 26 +++- .../java/org/apache/activemq/bugs/AMQ9475Test.java | 132 + 2 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ9475Test.java
(activemq) 01/02: AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 99568ef1fd8366cf57da7ca4ee0862fcaca2c2b7 Author: Christopher L. Shannon AuthorDate: Thu Feb 15 09:39:48 2024 -0500 AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates This commit fixes a bug in KahaDB that caused gaps in sequence ack tracking for durables that would lead to the appearance of stuck messages on durable subs if duplicate messages were detected. The sequence is now correctly rolled back so that there is no gap if the message is not added to the order index (cherry picked from commit 10d94bd16585df7bf9c266958c5e45bb556e8c14) (cherry picked from commit 2b856f4da6de0395619bc5b87366d1400dcbb49d) --- .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../kahadb/KahaDBDurableMessageRecoveryTest.java | 41 ++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index d26c7bd10..00b9bce25 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } metadata.lastUpdate = location; } else { - MessageKeys messageKeys = sd.orderIndex.get(tx, previous); if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt @@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); +// ensure sequence is not broken +sd.orderIndex.revertNextMessageId(); id = -1; } } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java index cf22b272a..cea5ca019 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java @@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.Wait; import org.junit.After; @@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest { assertEquals(10, sub2Recovered.get()); } +/** + * AMQ-9453 Validates that the order index doesn't have gaps in sequence + * tracking on duplicates + */ +@Test +public void durableRecoveryDuplicates() throws Exception { +String testTopic = "test.topic.duplicates"; + +Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); +ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic); +MessageProducer producer = session.createProducer(topic); +TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1"); + +final Destination brokerTopic = broker.getDestination(topic); +final MessageStore store = brokerTopic.getMessageStore(); +for (int i = 1; i <= 10; i++) { +TextMessage message = session.createTextMessage("msg: " + i); +producer.send(message); +// For each message try to add it twice to the store, the store should detect +// the duplicate and prevent it. This used to break the order index cursor +// which would cause the ack sequences to have gaps so the metrics reported +
(activemq) branch activemq-5.17.x updated (680d56f46 -> 761c5bbde)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git from 680d56f46 [AMQ-9424] Upgrade Jackson 2.16.1 new 99568ef1f AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates new 761c5bbde AMQ-9436 - Ensure message audit in queue store cursor is shared The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../broker/region/cursors/StoreQueueCursor.java| 8 +++-- .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../region/QueueDuplicatesFromStoreTest.java | 15 .../kahadb/KahaDBDurableMessageRecoveryTest.java | 41 ++ 4 files changed, 64 insertions(+), 3 deletions(-)
(activemq) 02/02: AMQ-9436 - Ensure message audit in queue store cursor is shared
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 761c5bbdecde019833031122eb1585e3bf1b2949 Author: Christopher L. Shannon AuthorDate: Thu Feb 15 17:27:52 2024 -0500 AMQ-9436 - Ensure message audit in queue store cursor is shared This commit fixes the initialization of the StoreQueueCursor message audit object to make sure it's shared between the persistent and non persistent cursors. It also adds a check to ensure that duplicate calls to start will not try and init more than once. (cherry picked from commit 75de9321162ae096de4a3c0b5a325865d514e5a0) --- .../activemq/broker/region/cursors/StoreQueueCursor.java | 8 ++-- .../broker/region/QueueDuplicatesFromStoreTest.java | 15 +++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index a7b4c6ec9..7e877fa2d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -53,7 +53,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { @Override public synchronized void start() throws Exception { -started = true; +if (isStarted()) { +return; +} super.start(); if (nonPersistent == null) { if (broker.getBrokerService().isPersistent()) { @@ -76,7 +78,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { @Override public synchronized void stop() throws Exception { -started = false; +if (!isStarted()) { +return; +} if (nonPersistent != null) { nonPersistent.destroy(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 6cef70904..653c0556e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -29,10 +29,13 @@ import javax.management.ObjectName; import junit.framework.TestCase; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.SubscriptionStatistics; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -117,6 +120,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase { queue.initialize(); queue.start(); +// verify that the cursor message audit is created and set with the +// correct audit depth and shared with the persistent and non peristent +// cursors +final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages(); +ActiveMQMessageAudit messageAudit = messages.getMessageAudit(); +assertNotNull(messageAudit); +assertEquals(auditDepth, messageAudit.getAuditDepth()); +assertSame(messageAudit, messages.getPersistent().getMessageAudit()); +assertSame(messageAudit, messages.getNonPersistent().getMessageAudit()); +// Verify calling start again doesn't re-initial the audit +messages.start(); +assertSame(messageAudit, messages.getMessageAudit()); ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); ProducerInfo producerInfo = new ProducerInfo();
(activemq) branch activemq-5.18.x updated: AMQ-9436 - Ensure message audit in queue store cursor is shared
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 4bae9df5b AMQ-9436 - Ensure message audit in queue store cursor is shared 4bae9df5b is described below commit 4bae9df5bdd2b992a716300c70e35dc892f26bd2 Author: Christopher L. Shannon AuthorDate: Thu Feb 15 17:27:52 2024 -0500 AMQ-9436 - Ensure message audit in queue store cursor is shared This commit fixes the initialization of the StoreQueueCursor message audit object to make sure it's shared between the persistent and non persistent cursors. It also adds a check to ensure that duplicate calls to start will not try and init more than once. (cherry picked from commit 75de9321162ae096de4a3c0b5a325865d514e5a0) --- .../activemq/broker/region/cursors/StoreQueueCursor.java | 8 ++-- .../broker/region/QueueDuplicatesFromStoreTest.java | 15 +++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index a7b4c6ec9..7e877fa2d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -53,7 +53,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { @Override public synchronized void start() throws Exception { -started = true; +if (isStarted()) { +return; +} super.start(); if (nonPersistent == null) { if (broker.getBrokerService().isPersistent()) { @@ -76,7 +78,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { @Override public synchronized void stop() throws Exception { -started = false; +if (!isStarted()) { +return; +} if (nonPersistent != null) { nonPersistent.destroy(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 6cef70904..653c0556e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -29,10 +29,13 @@ import javax.management.ObjectName; import junit.framework.TestCase; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.SubscriptionStatistics; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -117,6 +120,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase { queue.initialize(); queue.start(); +// verify that the cursor message audit is created and set with the +// correct audit depth and shared with the persistent and non peristent +// cursors +final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages(); +ActiveMQMessageAudit messageAudit = messages.getMessageAudit(); +assertNotNull(messageAudit); +assertEquals(auditDepth, messageAudit.getAuditDepth()); +assertSame(messageAudit, messages.getPersistent().getMessageAudit()); +assertSame(messageAudit, messages.getNonPersistent().getMessageAudit()); +// Verify calling start again doesn't re-initial the audit +messages.start(); +assertSame(messageAudit, messages.getMessageAudit()); ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); ProducerInfo producerInfo = new ProducerInfo();
(activemq) branch activemq-6.0.x updated: AMQ-9436 - Ensure message audit in queue store cursor is shared
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-6.0.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-6.0.x by this push: new 3741f3e7f AMQ-9436 - Ensure message audit in queue store cursor is shared 3741f3e7f is described below commit 3741f3e7fdf02d5233db1bbefd477704f2e0059e Author: Christopher L. Shannon AuthorDate: Thu Feb 15 17:27:52 2024 -0500 AMQ-9436 - Ensure message audit in queue store cursor is shared This commit fixes the initialization of the StoreQueueCursor message audit object to make sure it's shared between the persistent and non persistent cursors. It also adds a check to ensure that duplicate calls to start will not try and init more than once. (cherry picked from commit 75de9321162ae096de4a3c0b5a325865d514e5a0) --- .../activemq/broker/region/cursors/StoreQueueCursor.java | 8 ++-- .../broker/region/QueueDuplicatesFromStoreTest.java | 15 +++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index a7b4c6ec9..7e877fa2d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -53,7 +53,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { @Override public synchronized void start() throws Exception { -started = true; +if (isStarted()) { +return; +} super.start(); if (nonPersistent == null) { if (broker.getBrokerService().isPersistent()) { @@ -76,7 +78,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { @Override public synchronized void stop() throws Exception { -started = false; +if (!isStarted()) { +return; +} if (nonPersistent != null) { nonPersistent.destroy(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 038bb4039..dd9bcc5ac 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -29,10 +29,13 @@ import javax.management.ObjectName; import junit.framework.TestCase; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.SubscriptionStatistics; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -117,6 +120,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase { queue.initialize(); queue.start(); +// verify that the cursor message audit is created and set with the +// correct audit depth and shared with the persistent and non peristent +// cursors +final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages(); +ActiveMQMessageAudit messageAudit = messages.getMessageAudit(); +assertNotNull(messageAudit); +assertEquals(auditDepth, messageAudit.getAuditDepth()); +assertSame(messageAudit, messages.getPersistent().getMessageAudit()); +assertSame(messageAudit, messages.getNonPersistent().getMessageAudit()); +// Verify calling start again doesn't re-initial the audit +messages.start(); +assertSame(messageAudit, messages.getMessageAudit()); ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); ProducerInfo producerInfo = new ProducerInfo();
(activemq) 01/01: Merge pull request #1154 from cshannon/AMQ-9436
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git commit 65a6b805a04766ab0523033634c00cc338090217 Merge: 30d54c429 75de93211 Author: Christopher L. Shannon AuthorDate: Thu Feb 15 18:16:02 2024 -0500 Merge pull request #1154 from cshannon/AMQ-9436 AMQ-9436 - Ensure message audit in queue store cursor is shared .../activemq/broker/region/cursors/StoreQueueCursor.java | 8 ++-- .../broker/region/QueueDuplicatesFromStoreTest.java | 15 +++ 2 files changed, 21 insertions(+), 2 deletions(-)
(activemq) branch main updated (30d54c429 -> 65a6b805a)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 30d54c429 Merge pull request #1153 from cshannon/AMQ-9435 add 75de93211 AMQ-9436 - Ensure message audit in queue store cursor is shared new 65a6b805a Merge pull request #1154 from cshannon/AMQ-9436 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../activemq/broker/region/cursors/StoreQueueCursor.java | 8 ++-- .../broker/region/QueueDuplicatesFromStoreTest.java | 15 +++ 2 files changed, 21 insertions(+), 2 deletions(-)
(activemq) branch activemq-5.18.x updated: AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 722a075e2 AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates 722a075e2 is described below commit 722a075e2c4a331c258510618cfec3330f014c73 Author: Christopher L. Shannon AuthorDate: Thu Feb 15 09:39:48 2024 -0500 AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates This commit fixes a bug in KahaDB that caused gaps in sequence ack tracking for durables that would lead to the appearance of stuck messages on durable subs if duplicate messages were detected. The sequence is now correctly rolled back so that there is no gap if the message is not added to the order index (cherry picked from commit 10d94bd16585df7bf9c266958c5e45bb556e8c14) (cherry picked from commit 2b856f4da6de0395619bc5b87366d1400dcbb49d) --- .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../kahadb/KahaDBDurableMessageRecoveryTest.java | 41 ++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index d26c7bd10..00b9bce25 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } metadata.lastUpdate = location; } else { - MessageKeys messageKeys = sd.orderIndex.get(tx, previous); if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt @@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); +// ensure sequence is not broken +sd.orderIndex.revertNextMessageId(); id = -1; } } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java index cf22b272a..cea5ca019 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java @@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.Wait; import org.junit.After; @@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest { assertEquals(10, sub2Recovered.get()); } +/** + * AMQ-9453 Validates that the order index doesn't have gaps in sequence + * tracking on duplicates + */ +@Test +public void durableRecoveryDuplicates() throws Exception { +String testTopic = "test.topic.duplicates"; + +Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); +ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic); +MessageProducer producer = session.createProducer(topic); +TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1"); + +final Destination brokerTopic = broker.getDestination(topic); +final MessageStore store = brokerTopic.getMessageStore(); +for (int i = 1; i <= 10; i++) { +TextMessage message = session.createTextMessage("msg: " + i); +producer.send(message); +// For each message try to add it twice to the store, the store should dete
(activemq) branch activemq-6.0.x updated: AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-6.0.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-6.0.x by this push: new 2b856f4da AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates 2b856f4da is described below commit 2b856f4da6de0395619bc5b87366d1400dcbb49d Author: Christopher L. Shannon AuthorDate: Thu Feb 15 09:39:48 2024 -0500 AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates This commit fixes a bug in KahaDB that caused gaps in sequence ack tracking for durables that would lead to the appearance of stuck messages on durable subs if duplicate messages were detected. The sequence is now correctly rolled back so that there is no gap if the message is not added to the order index (cherry picked from commit 10d94bd16585df7bf9c266958c5e45bb556e8c14) --- .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../kahadb/KahaDBDurableMessageRecoveryTest.java | 41 ++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index bb28221a9..dbab306fc 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } metadata.lastUpdate = location; } else { - MessageKeys messageKeys = sd.orderIndex.get(tx, previous); if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt @@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); +// ensure sequence is not broken +sd.orderIndex.revertNextMessageId(); id = -1; } } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java index 58cac05df..041fce238 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java @@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.Wait; import org.junit.After; @@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest { assertEquals(10, sub2Recovered.get()); } +/** + * AMQ-9453 Validates that the order index doesn't have gaps in sequence + * tracking on duplicates + */ +@Test +public void durableRecoveryDuplicates() throws Exception { +String testTopic = "test.topic.duplicates"; + +Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); +ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic); +MessageProducer producer = session.createProducer(topic); +TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1"); + +final Destination brokerTopic = broker.getDestination(topic); +final MessageStore store = brokerTopic.getMessageStore(); +for (int i = 1; i <= 10; i++) { +TextMessage message = session.createTextMessage("msg: " + i); +producer.send(message); +// For each message try to add it twice to the store, the store should detect +// the duplicate and prevent it. This used to break the order in
(activemq) 01/01: Merge pull request #1153 from cshannon/AMQ-9435
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git commit 30d54c4299d3bf3573bfcfbb414af2428066acad Merge: afe9e8d1f 10d94bd16 Author: Christopher L. Shannon AuthorDate: Thu Feb 15 10:37:39 2024 -0500 Merge pull request #1153 from cshannon/AMQ-9435 AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../kahadb/KahaDBDurableMessageRecoveryTest.java | 41 ++ 2 files changed, 43 insertions(+), 1 deletion(-)
(activemq) branch main updated (afe9e8d1f -> 30d54c429)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from afe9e8d1f Merge branch 'mattrpav-AMQ-9426' add 10d94bd16 AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates new 30d54c429 Merge pull request #1153 from cshannon/AMQ-9435 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../kahadb/KahaDBDurableMessageRecoveryTest.java | 41 ++ 2 files changed, 43 insertions(+), 1 deletion(-)
(activemq) branch activemq-5.17.x updated: AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new 120813839 AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack 120813839 is described below commit 1208138390ed363f85528ce47449280d994b16cc Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 23 09:33:17 2024 -0500 AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack This adds a check in case a duplicate ack is passed to the store to make sure that the subscription statistics (if enabled) for a durable sub do not have the metrics decremented a second time (cherry picked from commit 918099cb7ee4c46c9a3ea1d418411444dd00fcec) (cherry picked from commit 1fce36b3e50ed7254fbdcfc1c57e339c31b9b5e2) --- .../activemq/store/kahadb/MessageDatabase.java | 18 ++- .../cursors/AbstractPendingMessageCursorTest.java | 2 +- .../store/AbstractMessageStoreSizeStatTest.java| 138 - .../store/AbstractStoreStatTestSupport.java| 13 +- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 22 .../MultiKahaDBMessageStoreSizeStatTest.java | 21 .../memory/MemoryMessageStoreSizeStatTest.java | 6 +- 7 files changed, 181 insertions(+), 39 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 6687c56b4..d26c7bd10 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -3148,18 +3148,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (messageSequence != null) { SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); if (range != null && !range.isEmpty()) { -range.remove(messageSequence); +boolean removed = range.remove(messageSequence); if (!range.isEmpty()) { sd.ackPositions.put(tx, subscriptionKey, range); } else { sd.ackPositions.remove(tx, subscriptionKey); } -MessageKeys key = sd.orderIndex.get(tx, messageSequence); -decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, +// Only decrement the statistics if the message was removed +// from the ack set for the subscription +// Fix for AMQ-9420 +if (removed) { +MessageKeys key = sd.orderIndex.get(tx, messageSequence); +decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, key.location.getSize()); +} else { +LOG.warn("Received unexpected duplicate ack: messageId: {}, Sub: {}, Dest: {}", +command.getMessageId(), subscriptionKey, command.getDestination()); +} // Check if the message is reference by any other subscription. +// If removed was previously false then we could return before +// this check as this should always return true (should still be +// a reference) but removed being false is unexpected in the first +// place so this is a good second check to verify. if (isSequenceReferenced(tx, sd, messageSequence)) { return; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java index 645fb2ddd..cd92da696 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -533,7 +533,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat return publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, -publishedMessageSize, false, deliveryMode); +publishedMessageSize, null, false, deliveryMode); } protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStor
(activemq) branch activemq-5.18.x updated: AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 1fce36b3e AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack 1fce36b3e is described below commit 1fce36b3e50ed7254fbdcfc1c57e339c31b9b5e2 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 23 09:33:17 2024 -0500 AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack This adds a check in case a duplicate ack is passed to the store to make sure that the subscription statistics (if enabled) for a durable sub do not have the metrics decremented a second time (cherry picked from commit 918099cb7ee4c46c9a3ea1d418411444dd00fcec) --- .../activemq/store/kahadb/MessageDatabase.java | 18 ++- .../cursors/AbstractPendingMessageCursorTest.java | 2 +- .../store/AbstractMessageStoreSizeStatTest.java| 138 - .../store/AbstractStoreStatTestSupport.java| 13 +- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 22 .../MultiKahaDBMessageStoreSizeStatTest.java | 21 .../memory/MemoryMessageStoreSizeStatTest.java | 6 +- 7 files changed, 181 insertions(+), 39 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 6687c56b4..d26c7bd10 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -3148,18 +3148,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (messageSequence != null) { SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); if (range != null && !range.isEmpty()) { -range.remove(messageSequence); +boolean removed = range.remove(messageSequence); if (!range.isEmpty()) { sd.ackPositions.put(tx, subscriptionKey, range); } else { sd.ackPositions.remove(tx, subscriptionKey); } -MessageKeys key = sd.orderIndex.get(tx, messageSequence); -decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, +// Only decrement the statistics if the message was removed +// from the ack set for the subscription +// Fix for AMQ-9420 +if (removed) { +MessageKeys key = sd.orderIndex.get(tx, messageSequence); +decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, key.location.getSize()); +} else { +LOG.warn("Received unexpected duplicate ack: messageId: {}, Sub: {}, Dest: {}", +command.getMessageId(), subscriptionKey, command.getDestination()); +} // Check if the message is reference by any other subscription. +// If removed was previously false then we could return before +// this check as this should always return true (should still be +// a reference) but removed being false is unexpected in the first +// place so this is a good second check to verify. if (isSequenceReferenced(tx, sd, messageSequence)) { return; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java index 645fb2ddd..cd92da696 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -533,7 +533,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat return publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, -publishedMessageSize, false, deliveryMode); +publishedMessageSize, null, false, deliveryMode); } protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apa
(activemq) branch activemq-5.17.x updated: AMQ-9344 - remove jakarta imports and fix compilation on backport
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new 713e662db AMQ-9344 - remove jakarta imports and fix compilation on backport 713e662db is described below commit 713e662dbda965a22f54fb6536833e0efa3d9638 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 23 10:46:29 2024 -0500 AMQ-9344 - remove jakarta imports and fix compilation on backport --- .../org/apache/activemq/broker/TransportConnection.java| 1 - .../activemq/usecases/MaxUncommittedCountExceededTest.java | 14 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 8c6ff28cc..9ac9001e8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -106,7 +106,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import jakarta.jms.ResourceAllocationException; public class TransportConnection implements Connection, Task, CommandVisitor { private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java index 320ae4f11..6ebf71599 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java @@ -42,13 +42,13 @@ import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import jakarta.jms.Connection; -import jakarta.jms.JMSException; -import jakarta.jms.Message; -import jakarta.jms.MessageProducer; -import jakarta.jms.Queue; -import jakarta.jms.ResourceAllocationException; -import jakarta.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; @RunWith(value = Parameterized.class) public class MaxUncommittedCountExceededTest {
(activemq) branch activemq-5.18.x updated: AMQ-9344 - remove more jakarta imports
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new e9bc27130 AMQ-9344 - remove more jakarta imports e9bc27130 is described below commit e9bc27130d1a0b22f76aa07b7bded06124c476ac Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 23 10:54:12 2024 -0500 AMQ-9344 - remove more jakarta imports --- .../activemq/usecases/MaxUncommittedCountExceededTest.java | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java index 320ae4f11..6ebf71599 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java @@ -42,13 +42,13 @@ import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import jakarta.jms.Connection; -import jakarta.jms.JMSException; -import jakarta.jms.Message; -import jakarta.jms.MessageProducer; -import jakarta.jms.Queue; -import jakarta.jms.ResourceAllocationException; -import jakarta.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; @RunWith(value = Parameterized.class) public class MaxUncommittedCountExceededTest {
(activemq) branch activemq-5.18.x updated: AMQ-9344 - fix compilation on backport
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 1bd897fe5 AMQ-9344 - fix compilation on backport 1bd897fe5 is described below commit 1bd897fe5f056314e659687cd80bc2b320e80029 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 23 10:46:29 2024 -0500 AMQ-9344 - fix compilation on backport --- .../src/main/java/org/apache/activemq/broker/TransportConnection.java| 1 - 1 file changed, 1 deletion(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index b25382be7..ed2fd1f37 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -104,7 +104,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import jakarta.jms.ResourceAllocationException; public class TransportConnection implements Connection, Task, CommandVisitor { private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
(activemq) branch activemq-6.0.x updated: AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-6.0.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-6.0.x by this push: new 918099cb7 AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack 918099cb7 is described below commit 918099cb7ee4c46c9a3ea1d418411444dd00fcec Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 23 09:33:17 2024 -0500 AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack This adds a check in case a duplicate ack is passed to the store to make sure that the subscription statistics (if enabled) for a durable sub do not have the metrics decremented a second time (cherry picked from commit f73cf2aaab026dcddeed7a46b272801a43d95113) (cherry picked from commit a0b8a1fe55b7b309d754dceaf6cde81967c7f78b) --- .../activemq/store/kahadb/MessageDatabase.java | 18 ++- .../cursors/AbstractPendingMessageCursorTest.java | 2 +- .../store/AbstractMessageStoreSizeStatTest.java| 142 - .../store/AbstractStoreStatTestSupport.java| 13 +- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 22 .../MultiKahaDBMessageStoreSizeStatTest.java | 21 +++ .../memory/MemoryMessageStoreSizeStatTest.java | 6 +- 7 files changed, 183 insertions(+), 41 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index c5cb20a25..bb28221a9 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -3148,18 +3148,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (messageSequence != null) { SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); if (range != null && !range.isEmpty()) { -range.remove(messageSequence); +boolean removed = range.remove(messageSequence); if (!range.isEmpty()) { sd.ackPositions.put(tx, subscriptionKey, range); } else { sd.ackPositions.remove(tx, subscriptionKey); } -MessageKeys key = sd.orderIndex.get(tx, messageSequence); -decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, +// Only decrement the statistics if the message was removed +// from the ack set for the subscription +// Fix for AMQ-9420 +if (removed) { +MessageKeys key = sd.orderIndex.get(tx, messageSequence); +decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, key.location.getSize()); +} else { +LOG.warn("Received unexpected duplicate ack: messageId: {}, Sub: {}, Dest: {}", +command.getMessageId(), subscriptionKey, command.getDestination()); +} // Check if the message is reference by any other subscription. +// If removed was previously false then we could return before +// this check as this should always return true (should still be +// a reference) but removed being false is unexpected in the first +// place so this is a good second check to verify. if (isSequenceReferenced(tx, sd, messageSequence)) { return; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java index d2ac17b5e..0ddea1ed7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -533,7 +533,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat return publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, -publishedMessageSize, false, deliveryMode); +publishedMessageSize, null, false, deliveryMode); } protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSiz
(activemq) 01/01: Merge pull request #1142 from cshannon/AMQ-9420
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git commit 38b6d86ff8fb45c6f9795ed1c3cf7046dc521eb2 Merge: 0b95f9344 a0b8a1fe5 Author: Christopher L. Shannon AuthorDate: Tue Jan 23 10:34:28 2024 -0500 Merge pull request #1142 from cshannon/AMQ-9420 AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack .../activemq/store/kahadb/MessageDatabase.java | 18 ++- .../cursors/AbstractPendingMessageCursorTest.java | 2 +- .../store/AbstractMessageStoreSizeStatTest.java| 142 - .../store/AbstractStoreStatTestSupport.java| 13 +- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 22 .../MultiKahaDBMessageStoreSizeStatTest.java | 21 +++ .../memory/MemoryMessageStoreSizeStatTest.java | 6 +- 7 files changed, 183 insertions(+), 41 deletions(-)
(activemq) branch main updated (0b95f9344 -> 38b6d86ff)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 0b95f9344 AMQ-9412: fix docker entrypoiint script add f73cf2aaa AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack add a0b8a1fe5 revert unnecessary change new 38b6d86ff Merge pull request #1142 from cshannon/AMQ-9420 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../activemq/store/kahadb/MessageDatabase.java | 18 ++- .../cursors/AbstractPendingMessageCursorTest.java | 2 +- .../store/AbstractMessageStoreSizeStatTest.java| 142 - .../store/AbstractStoreStatTestSupport.java| 13 +- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 22 .../MultiKahaDBMessageStoreSizeStatTest.java | 21 +++ .../memory/MemoryMessageStoreSizeStatTest.java | 6 +- 7 files changed, 183 insertions(+), 41 deletions(-)
(activemq-openwire) branch main updated: OPENWIRE-75 - convert javax to jakarta jms exception
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-openwire.git The following commit(s) were added to refs/heads/main by this push: new c5e9102 OPENWIRE-75 - convert javax to jakarta jms exception c5e9102 is described below commit c5e91020adbf3864d0a920e4c76154285054d792 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 16 16:11:58 2024 -0500 OPENWIRE-75 - convert javax to jakarta jms exception This allows properly translating jms exceptions received when connection to an old 5.x broker that is using javax namespace --- .../openwire/codec/BaseDataStreamMarshaller.java| 7 +-- .../openwire/utils/OpenWireValidationSupport.java | 17 + 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java index 8267538..751e8fb 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.openwire.codec; +import static org.apache.activemq.openwire.utils.OpenWireValidationSupport.convertJmsPackage; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -191,7 +193,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { StackTraceElement ss[] = new StackTraceElement[dataIn.readShort()]; for (int i = 0; i < ss.length; i++) { try { -ss[i] = STACK_TRACE_ELEMENT_CONSTRUCTOR.newInstance(new Object[] { tightUnmarshalString(dataIn, bs), +ss[i] = STACK_TRACE_ELEMENT_CONSTRUCTOR.newInstance(new Object[] { convertJmsPackage(tightUnmarshalString(dataIn, bs)), tightUnmarshalString(dataIn, bs), tightUnmarshalString(dataIn, bs), Integer.valueOf(dataIn.readInt()) }); } catch (IOException e) { throw e; @@ -219,6 +221,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { +className = convertJmsPackage(className); Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader()); OpenWireValidationSupport.validateIsThrowable(clazz); Constructor constructor = clazz.getConstructor(String.class); @@ -502,7 +505,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { StackTraceElement ss[] = new StackTraceElement[dataIn.readShort()]; for (int i = 0; i < ss.length; i++) { try { -ss[i] = STACK_TRACE_ELEMENT_CONSTRUCTOR.newInstance(new Object[] { looseUnmarshalString(dataIn), +ss[i] = STACK_TRACE_ELEMENT_CONSTRUCTOR.newInstance(new Object[] { convertJmsPackage(looseUnmarshalString(dataIn)), looseUnmarshalString(dataIn), looseUnmarshalString(dataIn), Integer.valueOf(dataIn.readInt()) }); } catch (IOException e) { throw e; diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireValidationSupport.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireValidationSupport.java index 95e5f62..252f145 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireValidationSupport.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireValidationSupport.java @@ -18,6 +18,9 @@ package org.apache.activemq.openwire.utils; public class OpenWireValidationSupport { +private static final String jmsPackageToReplace = "javax.jms"; +private static final String jmsPackageToUse = "jakarta.jms"; + /** * Verify that the provided class extends {@link Throwable} and throw an * {@link IllegalArgumentException} if it does not. @@ -29,4 +32,18 @@ public class OpenWireValidationSupport { throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); } } + +/** + * This method can be used to convert from javax -> jakarta or + * vice versa depending on the version used by the client + * + * @param className + * @return + */ +public static String convertJmsPackage(String className) { +if (className !=
(activemq) branch activemq-5.16.x updated: AMQ-9418 - Support converting jakarta jms exceptions to javax
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.16.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.16.x by this push: new cd201c0f7 AMQ-9418 - Support converting jakarta jms exceptions to javax cd201c0f7 is described below commit cd201c0f7522b2054dfe7babcac0787af99b22ee Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 16 09:17:58 2024 -0500 AMQ-9418 - Support converting jakarta jms exceptions to javax This fixes the OpenWire v12 marshaller so that if an exception is received by a jakarta based broker the client will convert back to a javax jms exception type. (cherry picked from commit b92479cd2ec0913b6dae03a4789bdceae58b57b5) --- .../org/apache/activemq/openwire/OpenWireUtil.java | 18 ++ .../openwire/v12/BaseDataStreamMarshaller.java | 7 +-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java index f52e6c3e0..6ca48131a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -18,6 +18,9 @@ package org.apache.activemq.openwire; public class OpenWireUtil { +private static final String jmsPackageToReplace = "jakarta.jms"; +private static final String jmsPackageToUse = "javax.jms"; + /** * Verify that the provided class extends {@link Throwable} and throw an * {@link IllegalArgumentException} if it does not. @@ -29,4 +32,19 @@ public class OpenWireUtil { throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); } } + +/** + * This method can be used to convert from javax -> jakarta or + * vice versa depending on the version used by the client + * + * @param className + * @return + */ +public static String convertJmsPackage(String className) { +if (className != null && className.startsWith(jmsPackageToReplace)) { +return className.replace(jmsPackageToReplace, jmsPackageToUse); +} +return className; +} + } diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java index 10fdebcbc..0b66e72bb 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.openwire.v12; +import static org.apache.activemq.openwire.OpenWireUtil.convertJmsPackage; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -197,7 +199,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {tightUnmarshalString(dataIn, bs), +.newInstance(new Object[] {convertJmsPackage(tightUnmarshalString(dataIn, bs)), tightUnmarshalString(dataIn, bs), tightUnmarshalString(dataIn, bs), Integer.valueOf(dataIn.readInt())}); @@ -227,6 +229,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { +className = convertJmsPackage(className); Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader()); OpenWireUtil.validateIsThrowable(clazz); Constructor constructor = clazz.getConstructor(new Class[] {String.class}); @@ -521,7 +524,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {looseUnmarshalString(dataIn), +.newInstance(new Object[] {convertJmsPackage(looseUnmarshalString(dataIn)),
(activemq) branch activemq-5.17.x updated: AMQ-9418 - Support converting jakarta jms exceptions to javax
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new da2753613 AMQ-9418 - Support converting jakarta jms exceptions to javax da2753613 is described below commit da27536132fb99427019b7c43bbb11458c339bc6 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 16 09:17:58 2024 -0500 AMQ-9418 - Support converting jakarta jms exceptions to javax This fixes the OpenWire v12 marshaller so that if an exception is received by a jakarta based broker the client will convert back to a javax jms exception type. (cherry picked from commit b92479cd2ec0913b6dae03a4789bdceae58b57b5) --- .../org/apache/activemq/openwire/OpenWireUtil.java | 18 ++ .../openwire/v12/BaseDataStreamMarshaller.java | 7 +-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java index f52e6c3e0..6ca48131a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -18,6 +18,9 @@ package org.apache.activemq.openwire; public class OpenWireUtil { +private static final String jmsPackageToReplace = "jakarta.jms"; +private static final String jmsPackageToUse = "javax.jms"; + /** * Verify that the provided class extends {@link Throwable} and throw an * {@link IllegalArgumentException} if it does not. @@ -29,4 +32,19 @@ public class OpenWireUtil { throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); } } + +/** + * This method can be used to convert from javax -> jakarta or + * vice versa depending on the version used by the client + * + * @param className + * @return + */ +public static String convertJmsPackage(String className) { +if (className != null && className.startsWith(jmsPackageToReplace)) { +return className.replace(jmsPackageToReplace, jmsPackageToUse); +} +return className; +} + } diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java index 10fdebcbc..0b66e72bb 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.openwire.v12; +import static org.apache.activemq.openwire.OpenWireUtil.convertJmsPackage; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -197,7 +199,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {tightUnmarshalString(dataIn, bs), +.newInstance(new Object[] {convertJmsPackage(tightUnmarshalString(dataIn, bs)), tightUnmarshalString(dataIn, bs), tightUnmarshalString(dataIn, bs), Integer.valueOf(dataIn.readInt())}); @@ -227,6 +229,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { +className = convertJmsPackage(className); Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader()); OpenWireUtil.validateIsThrowable(clazz); Constructor constructor = clazz.getConstructor(new Class[] {String.class}); @@ -521,7 +524,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {looseUnmarshalString(dataIn), +.newInstance(new Object[] {convertJmsPackage(looseUnmarshalString(dataIn)),
(activemq) branch activemq-6.0.x updated: AMQ-9418 - Support converting javax jms exceptions to jakarta
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-6.0.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-6.0.x by this push: new 7a3db6f1d AMQ-9418 - Support converting javax jms exceptions to jakarta 7a3db6f1d is described below commit 7a3db6f1dc569888a4f8204fda1a713df0eb1e38 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 16 09:17:58 2024 -0500 AMQ-9418 - Support converting javax jms exceptions to jakarta This fixes the OpenWire v12 marshaller so that if an exception is received by a javax based broker the client will convert back to a jakarta jms exception type. (cherry picked from commit be64fdba511803e93312e57040e5776a66cf7be2) --- .../org/apache/activemq/openwire/OpenWireUtil.java | 18 ++ .../openwire/v12/BaseDataStreamMarshaller.java | 7 +-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java index f52e6c3e0..9d0274426 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -18,6 +18,9 @@ package org.apache.activemq.openwire; public class OpenWireUtil { +private static final String jmsPackageToReplace = "javax.jms"; +private static final String jmsPackageToUse = "jakarta.jms"; + /** * Verify that the provided class extends {@link Throwable} and throw an * {@link IllegalArgumentException} if it does not. @@ -29,4 +32,19 @@ public class OpenWireUtil { throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); } } + +/** + * This method can be used to convert from javax -> jakarta or + * vice versa depending on the version used by the client + * + * @param className + * @return + */ +public static String convertJmsPackage(String className) { +if (className != null && className.startsWith(jmsPackageToReplace)) { +return className.replace(jmsPackageToReplace, jmsPackageToUse); +} +return className; +} + } diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java index fe43d3cbd..7c59753d3 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.openwire.v12; +import static org.apache.activemq.openwire.OpenWireUtil.convertJmsPackage; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -197,7 +199,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {tightUnmarshalString(dataIn, bs), +.newInstance(new Object[] {convertJmsPackage(tightUnmarshalString(dataIn, bs)), tightUnmarshalString(dataIn, bs), tightUnmarshalString(dataIn, bs), dataIn.readInt()}); @@ -227,6 +229,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { +className = convertJmsPackage(className); Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader()); OpenWireUtil.validateIsThrowable(clazz); Constructor constructor = clazz.getConstructor(new Class[] {String.class}); @@ -521,7 +524,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {looseUnmarshalString(dataIn), +.newInstance(new Object[] {convertJmsPackage(looseUnmarshalString(dataIn)),
(activemq) branch main updated: AMQ-9418 - Support converting javax jms exceptions to jakarta
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new be64fdba5 AMQ-9418 - Support converting javax jms exceptions to jakarta new a2bf5b8ad Merge pull request #1141 from cshannon/javax-jakarta-exception-conversion be64fdba5 is described below commit be64fdba511803e93312e57040e5776a66cf7be2 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 16 09:17:58 2024 -0500 AMQ-9418 - Support converting javax jms exceptions to jakarta This fixes the OpenWire v12 marshaller so that if an exception is received by a javax based broker the client will convert back to a jakarta jms exception type. --- .../org/apache/activemq/openwire/OpenWireUtil.java | 18 ++ .../openwire/v12/BaseDataStreamMarshaller.java | 7 +-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java index f52e6c3e0..9d0274426 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -18,6 +18,9 @@ package org.apache.activemq.openwire; public class OpenWireUtil { +private static final String jmsPackageToReplace = "javax.jms"; +private static final String jmsPackageToUse = "jakarta.jms"; + /** * Verify that the provided class extends {@link Throwable} and throw an * {@link IllegalArgumentException} if it does not. @@ -29,4 +32,19 @@ public class OpenWireUtil { throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); } } + +/** + * This method can be used to convert from javax -> jakarta or + * vice versa depending on the version used by the client + * + * @param className + * @return + */ +public static String convertJmsPackage(String className) { +if (className != null && className.startsWith(jmsPackageToReplace)) { +return className.replace(jmsPackageToReplace, jmsPackageToUse); +} +return className; +} + } diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java index fe43d3cbd..7c59753d3 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v12/BaseDataStreamMarshaller.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.openwire.v12; +import static org.apache.activemq.openwire.OpenWireUtil.convertJmsPackage; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -197,7 +199,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {tightUnmarshalString(dataIn, bs), +.newInstance(new Object[] {convertJmsPackage(tightUnmarshalString(dataIn, bs)), tightUnmarshalString(dataIn, bs), tightUnmarshalString(dataIn, bs), dataIn.readInt()}); @@ -227,6 +229,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { +className = convertJmsPackage(className); Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader()); OpenWireUtil.validateIsThrowable(clazz); Constructor constructor = clazz.getConstructor(new Class[] {String.class}); @@ -521,7 +524,7 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { for (int i = 0; i < ss.length; i++) { try { ss[i] = (StackTraceElement)STACK_TRACE_ELEMENT_CONSTRUCTOR -.newInstance(new Object[] {looseUnmarshalString(dataIn), +.newInstance(new Object[] {convertJmsPackage(looseUnmarshalString(dataIn)), looseUnmarshalString(dataIn),
(activemq) branch activemq-5.18.x updated: AMQ-9418 - Support converting jakarta jms exceptions to javax
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new b92479cd2 AMQ-9418 - Support converting jakarta jms exceptions to javax new ef9c0bd59 Merge pull request #1140 from cshannon/jakarta-javax-exception-conversion b92479cd2 is described below commit b92479cd2ec0913b6dae03a4789bdceae58b57b5 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Jan 16 09:17:58 2024 -0500 AMQ-9418 - Support converting jakarta jms exceptions to javax This fixes the OpenWire v12 marshaller so that if an exception is received by a jakarta based broker the client will convert back to a javax jms exception type. This commit also updates activemq-client-jakarta to do the proper mapping in the reverse. --- activemq-client-jakarta/pom.xml| 52 +- .../org/apache/activemq/openwire/OpenWireUtil.java | 18 .../openwire/v12/BaseDataStreamMarshaller.java | 7 ++- 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/activemq-client-jakarta/pom.xml b/activemq-client-jakarta/pom.xml index 4386fabcd..09b096ee8 100644 --- a/activemq-client-jakarta/pom.xml +++ b/activemq-client-jakarta/pom.xml @@ -108,18 +108,50 @@ replace + + + ${project.build.directory}/copied-sources/activemq-client/**/*.java + + + ${project.build.directory}/copied-sources/activemq-client/**/openwire/OpenWireUtil.java + +javax.jms +jakarta.jms + +MULTILINE + + + + + +jms-package-to-replace +initialize + +replace + + + + ${project.build.directory}/copied-sources/activemq-client/**/openwire/OpenWireUtil.java + +jmsPackageToReplace = "jakarta.jms" +jmsPackageToReplace = "javax.jms" + + + +jms-package-to-use +initialize + +replace + + + + ${project.build.directory}/copied-sources/activemq-client/**/openwire/OpenWireUtil.java + +jmsPackageToUse = "javax.jms" +jmsPackageToUse = "jakarta.jms" + - - - ${project.build.directory}/copied-sources/activemq-client/**/*.java - -javax.jms -jakarta.jms - -MULTILINE - - maven-resources-plugin diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java index f52e6c3e0..6ca48131a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -18,6 +18,9 @@ package org.apache.activemq.openwire; public class OpenWireUtil { +private static final String jmsPackageToReplace = "jakarta.jms"; +private static final String jmsPackageToUse = "javax.jms"; + /** * Verify that the provided class extends {@link Throwable} and throw an * {@link IllegalArgumentException} if it does not. @@ -29,4 +32,19 @@ public class OpenWireUtil { throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); } } + +/** + * This method can be used to convert from javax -> jakarta or + * vice versa depending on the version used by the client + * + * @param className + * @return + */ +public static String convertJmsPackage(String className) { +if (className != null && className.s
(activemq-openwire) branch main updated: NO-JIRA: Re-run generator to fix PartialCommandMarshaller
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-openwire.git The following commit(s) were added to refs/heads/main by this push: new a2014ce NO-JIRA: Re-run generator to fix PartialCommandMarshaller a2014ce is described below commit a2014ce51cabc021cd69c8314847a00e666e61c2 Author: Christopher L. Shannon (cshannon) AuthorDate: Mon Dec 11 16:45:04 2023 -0500 NO-JIRA: Re-run generator to fix PartialCommandMarshaller --- .../openwire/codec/universal/PartialCommandMarshaller.java| 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/PartialCommandMarshaller.java b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/PartialCommandMarshaller.java index ebe3f66..673348b 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/PartialCommandMarshaller.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/PartialCommandMarshaller.java @@ -62,8 +62,8 @@ public class PartialCommandMarshaller extends BaseDataStreamMarshaller { PartialCommand info = (PartialCommand) target; -info.setData(tightUnmarshalConstByteArray(dataIn, bs, 0)); info.setCommandId(dataIn.readInt()); +info.setData(tightUnmarshalConstByteArray(dataIn, bs, 0)); } /** @@ -99,8 +99,8 @@ public class PartialCommandMarshaller extends BaseDataStreamMarshaller { PartialCommand info = (PartialCommand) source; -tightMarshalByteArray2(info.getData(), dataOut, bs); dataOut.writeInt(info.getCommandId()); +tightMarshalByteArray2(info.getData(), dataOut, bs); } /** @@ -112,8 +112,8 @@ public class PartialCommandMarshaller extends BaseDataStreamMarshaller { PartialCommand info = (PartialCommand) source; super.looseMarshal(wireFormat, source, dataOut); -looseMarshalByteArray(wireFormat, info.getData(), dataOut); dataOut.writeInt(info.getCommandId()); +looseMarshalByteArray(wireFormat, info.getData(), dataOut); } /** @@ -129,7 +129,7 @@ public class PartialCommandMarshaller extends BaseDataStreamMarshaller { PartialCommand info = (PartialCommand) target; -info.setData(looseUnmarshalByteArray(dataIn)); info.setCommandId(dataIn.readInt()); +info.setData(looseUnmarshalByteArray(dataIn)); } }
(activemq-openwire) branch main updated: OPENWIRE-69 - Fix PartialCommand and ProducerAck sequence ids (#4)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-openwire.git The following commit(s) were added to refs/heads/main by this push: new db4b306 OPENWIRE-69 - Fix PartialCommand and ProducerAck sequence ids (#4) db4b306 is described below commit db4b306d28e05fb46a9c68346120c60511800f74 Author: Christopher L. Shannon AuthorDate: Thu Dec 7 09:38:16 2023 -0500 OPENWIRE-69 - Fix PartialCommand and ProducerAck sequence ids (#4) PartialCommand and ProducerAck duplicate the same sequence id on more than one property. This commit fixes the sequence and also adds validation to the marshaller to verify that sequences are not duplicated and are contiguous and if they are not an IllegalArgumentException is thrown. --- .../activemq/openwire/commands/PartialCommand.java | 2 +- .../activemq/openwire/commands/ProducerAck.java| 2 +- openwire-generator/pom.xml | 16 +-- .../openwire/generator/GeneratorUtils.java | 8 +- .../generator/OpenWirePropertyDescriptor.java | 2 +- .../openwire/generator/OpenWireTypeDescriptor.java | 52 --- .../generator/OpenWireTypeDescriptorTest.java | 155 + 7 files changed, 202 insertions(+), 35 deletions(-) diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java index 6a69c24..a3e80ca 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java @@ -33,7 +33,7 @@ public class PartialCommand implements Command { @OpenWireProperty(version = 1, sequence = 1) private int commandId; -@OpenWireProperty(version = 1, sequence = 1, mandatory = true) +@OpenWireProperty(version = 1, sequence = 2, mandatory = true) private byte[] data; public PartialCommand() { diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java index e01a187..104a5e9 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java @@ -35,7 +35,7 @@ public class ProducerAck extends BaseCommand { @OpenWireProperty(version = 3, sequence = 1) protected ProducerId producerId; -@OpenWireProperty(version = 3, sequence = 1) +@OpenWireProperty(version = 3, sequence = 2) protected int size; public ProducerAck() { diff --git a/openwire-generator/pom.xml b/openwire-generator/pom.xml index 20464b6..dc0dfba 100644 --- a/openwire-generator/pom.xml +++ b/openwire-generator/pom.xml @@ -52,6 +52,11 @@ ant + + junit + junit + test + org.apache.logging.log4j log4j-slf4j2-impl @@ -59,15 +64,4 @@ - - - -maven-surefire-plugin - - true - - - - - diff --git a/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/GeneratorUtils.java b/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/GeneratorUtils.java index 05b1f40..9333e4b 100644 --- a/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/GeneratorUtils.java +++ b/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/GeneratorUtils.java @@ -68,7 +68,7 @@ public class GeneratorUtils { * * @throws Exception if an error occurs while scanning for properties. */ -public static Set finalOpenWireProperties(Class openWireType) throws Exception { +public static Set finalOpenWireProperties(Class openWireType) { @SuppressWarnings("unchecked") final Set properties = ReflectionUtils.getAllFields(openWireType, ReflectionUtils.withAnnotation(OpenWireProperty.class)); @@ -90,7 +90,7 @@ public class GeneratorUtils { * @throws Exception if an error occurs finding the get method. */ @SuppressWarnings("unchecked") -public static Method findGetMethodForProperty(Class openWireType, OpenWirePropertyDescriptor property) throws Exception { +public static Method findGetMethodForProperty(Class openWireType, OpenWirePropertyDescriptor property) { if (property.getType().equals(boolean.class)) { Set getters = getAllMethods(openWireType, @@ -139,7 +139,7 @@ public class GeneratorUtils { * @throws Exception if an error occurs finding the set method. */ @SuppressWarnings("unchecked") -public static Method findSetMethodForProperty(Class openWireType, OpenWirePrope
(activemq-openwire) branch main updated: OPENWIRE-66 - Bring project build up to date (#3)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-openwire.git The following commit(s) were added to refs/heads/main by this push: new 22a33ee OPENWIRE-66 - Bring project build up to date (#3) 22a33ee is described below commit 22a33eefc19e6c0403bb6530172a8bb9772e649d Author: Christopher L. Shannon AuthorDate: Wed Dec 6 16:19:35 2023 -0500 OPENWIRE-66 - Bring project build up to date (#3) This brings the build up to date by upgrading maven plugins, dependences and to JDK 17. It also updates to ActiveMQ 6.0.1 for testing and jakarta. This comimt is just for the build itself and future updates will add the latest versions of OpenWire. --- openwire-annotations/pom.xml | 2 +- openwire-core/pom.xml | 6 +- openwire-core/src/test/resources/log4j.properties | 41 --- .../src/test/resources/log4j2-test.properties | 54 ++ openwire-generator/pom.xml | 6 +- .../openwire/generator/GeneratorUtils.java | 22 +++--- .../src/main/resources/log4j.properties| 35 - .../src/main/resources/log4j2.properties | 54 ++ openwire-interop-tests/pom.xml | 7 +- .../openwire/codec/MessageCompressionTest.java | 10 +-- .../openwire/codec/OpenWireInteropTestSupport.java | 2 +- .../src/test/resources/log4j.properties| 41 --- .../src/test/resources/log4j2-test.properties | 54 ++ openwire-legacy/pom.xml| 2 +- openwire-website/pom.xml | 6 +- pom.xml| 86 +++--- 16 files changed, 232 insertions(+), 196 deletions(-) diff --git a/openwire-annotations/pom.xml b/openwire-annotations/pom.xml index af29c42..957438f 100644 --- a/openwire-annotations/pom.xml +++ b/openwire-annotations/pom.xml @@ -20,7 +20,7 @@ org.apache.activemq openwire-protocol -1.0-SNAPSHOT +1.0.0-SNAPSHOT openwire-annotations diff --git a/openwire-core/pom.xml b/openwire-core/pom.xml index 8b6d99c..1a0f995 100644 --- a/openwire-core/pom.xml +++ b/openwire-core/pom.xml @@ -20,7 +20,7 @@ org.apache.activemq openwire-protocol -1.0-SNAPSHOT +1.0.0-SNAPSHOT openwire-core @@ -54,8 +54,8 @@ test - org.slf4j - slf4j-log4j12 + org.apache.logging.log4j + log4j-slf4j2-impl test diff --git a/openwire-core/src/test/resources/log4j.properties b/openwire-core/src/test/resources/log4j.properties deleted file mode 100644 index 0588275..000 --- a/openwire-core/src/test/resources/log4j.properties +++ /dev/null @@ -1,41 +0,0 @@ -## --- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --- - -# -# The logging properties used during tests.. -# -log4j.rootLogger=INFO, out, stdout - -log4j.logger.org.apache.activemq.openwire=DEBUG - -# Tune the ActiveMQ and it's OpenWire transport as needed for debugging. -log4j.logger.org.apache.activemq=INFO -log4j.logger.org.apache.activemq.broker=DEBUG -log4j.logger.org.apache.activemq.transport.openwire=TRACE -log4j.logger.org.apache.activemq.transport.openwire.FRAMES=DEBUG - -# CONSOLE appender not used by default -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n - -# File appender -log4j.appender.out=org.apache.log4j.FileAppender -log4j.appender.out.layout=org.apache.log4j.PatternLayout -log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n -log4j.appender.out.file=target/activemq-test.log -log4j.appender.out.append=true diff --git a/openwire-core/src/test/resources/log4j2-test.properties b/openwire-core/src/test/resources/log4j2-test.properties new file mode
(activemq-openwire) 01/01: Merge pull request #1 from apache/dependabot/maven/junit-junit-4.13.1
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-openwire.git commit 2e6320da26f9e0c8bbab1a2b9bafee9c954201b5 Merge: 8e2baa0 1d8a888 Author: Christopher L. Shannon AuthorDate: Wed Dec 6 11:22:09 2023 -0500 Merge pull request #1 from apache/dependabot/maven/junit-junit-4.13.1 Bump junit from 4.12 to 4.13.1 pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(activemq-openwire) branch main updated (8e2baa0 -> 2e6320d)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq-openwire.git from 8e2baa0 Disable website generation for now. add 1d8a888 Bump junit from 4.12 to 4.13.1 new 2e6320d Merge pull request #1 from apache/dependabot/maven/junit-junit-4.13.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(activemq) branch main updated: AMQ-9388 - Exclude activemq-client-jakarta from camel-activemq
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 3013a3ab3 AMQ-9388 - Exclude activemq-client-jakarta from camel-activemq new cbe7c6a83 Merge pull request #1117 from cshannon/AMQ-9388 3013a3ab3 is described below commit 3013a3ab35a249b46ed6706d66a35f328ede1885 Author: Christopher L. Shannon (cshannon) AuthorDate: Mon Nov 13 17:43:24 2023 -0500 AMQ-9388 - Exclude activemq-client-jakarta from camel-activemq The current version of camel pulls in the activemq-client-jakarta jar which is not necessary as it no longer exists with ActiveMQ 6.0.0 Furthermore the version being pulled in is 5.18.2 which contains a critical CVE that was fixed in 5.18.3 --- assembly/pom.xml | 9 + 1 file changed, 9 insertions(+) diff --git a/assembly/pom.xml b/assembly/pom.xml index c1b7ca3ae..5743c22bc 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -165,6 +165,15 @@ org.apache.camel camel-activemq + + + + org.apache.activemq + activemq-client-jakarta + + org.apache.camel
[activemq] branch main updated: AMQ-9370 - Improve Openwire marshaller validation test
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 5719e5df2 AMQ-9370 - Improve Openwire marshaller validation test 5719e5df2 is described below commit 5719e5df2e3cb95b1fb5fe5923bbc3a8d02a27ef Author: Christopher L. Shannon (cshannon) AuthorDate: Thu Oct 26 17:53:41 2023 -0400 AMQ-9370 - Improve Openwire marshaller validation test --- .../activemq/openwire/OpenWireValidationTest.java | 18 ++ 1 file changed, 18 insertions(+) diff --git a/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java b/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java index a7a6a4f7c..e5c7687ee 100644 --- a/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java +++ b/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.openwire; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataOutput; @@ -24,9 +25,12 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.util.ByteSequence; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -40,6 +44,12 @@ import org.junit.runners.Parameterized.Parameters; public class OpenWireValidationTest { protected final int version; +private static final AtomicBoolean initialized = new AtomicBoolean(false); + +@Before +public void before() { +initialized.set(false); +} @Parameters(name = "version={0}") public static Collection data() { @@ -87,11 +97,19 @@ public class OpenWireValidationTest { assertTrue(response.getException() instanceof IllegalArgumentException); assertTrue(response.getException().getMessage().contains("is not assignable to Throwable")); + +// assert the class was never initialized +assertFalse(initialized.get()); } static class NotAThrowable { private String message; +static { +// Class should not be initialized so set flag here to verify +initialized.set(true); +} + public NotAThrowable(String message) { this.message = message; }
[activemq] branch activemq-5.18.x updated: AMQ-9370 - Improve Openwire marshaller validation test
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new d979d7c22 AMQ-9370 - Improve Openwire marshaller validation test d979d7c22 is described below commit d979d7c2261178506641baa55ecb998fe981a599 Author: Christopher L. Shannon (cshannon) AuthorDate: Thu Oct 26 17:53:41 2023 -0400 AMQ-9370 - Improve Openwire marshaller validation test (cherry picked from commit 5719e5df2e3cb95b1fb5fe5923bbc3a8d02a27ef) --- .../activemq/openwire/OpenWireValidationTest.java | 18 ++ 1 file changed, 18 insertions(+) diff --git a/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java b/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java index a7a6a4f7c..e5c7687ee 100644 --- a/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java +++ b/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.openwire; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataOutput; @@ -24,9 +25,12 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.util.ByteSequence; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -40,6 +44,12 @@ import org.junit.runners.Parameterized.Parameters; public class OpenWireValidationTest { protected final int version; +private static final AtomicBoolean initialized = new AtomicBoolean(false); + +@Before +public void before() { +initialized.set(false); +} @Parameters(name = "version={0}") public static Collection data() { @@ -87,11 +97,19 @@ public class OpenWireValidationTest { assertTrue(response.getException() instanceof IllegalArgumentException); assertTrue(response.getException().getMessage().contains("is not assignable to Throwable")); + +// assert the class was never initialized +assertFalse(initialized.get()); } static class NotAThrowable { private String message; +static { +// Class should not be initialized so set flag here to verify +initialized.set(true); +} + public NotAThrowable(String message) { this.message = message; }
[activemq] branch activemq-5.16.x updated: AMQ-9370 - Fix compatibility in test with java 8
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.16.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.16.x by this push: new 8f3bc AMQ-9370 - Fix compatibility in test with java 8 8f3bc is described below commit 8f3bce1f141676124b0e912caae0b5a3a94f Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Oct 24 08:30:02 2023 -0400 AMQ-9370 - Fix compatibility in test with java 8 --- .../test/java/org/apache/activemq/openwire/OpenWireValidationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java b/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java index a7a6a4f7c..0d5b68bc2 100644 --- a/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java +++ b/activemq-client/src/test/java/org/apache/activemq/openwire/OpenWireValidationTest.java @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import org.apache.activemq.command.CommandTypes; @@ -43,7 +44,7 @@ public class OpenWireValidationTest { @Parameters(name = "version={0}") public static Collection data() { -List versions = List.of(1, 9, 10, 11, 12); +List versions = Arrays.asList(1, 9, 10, 11, 12); List versionObjs = new ArrayList<>(); for (int i : versions) { versionObjs.add(new Object[]{i});
[activemq] branch activemq-5.16.x updated: AMQ-9370 - Openwire marshaller should validate Throwable class type
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.16.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.16.x by this push: new 22442b238 AMQ-9370 - Openwire marshaller should validate Throwable class type 22442b238 is described below commit 22442b2385b1000312aec3d19e510131d595a5fc Author: Christopher L. Shannon (cshannon) AuthorDate: Mon Oct 23 18:24:38 2023 -0400 AMQ-9370 - Openwire marshaller should validate Throwable class type (cherry picked from commit 3eaf3107f4fb9a3ce7ab45c175bfaeac7e866d5b) (cherry picked from commit d0ccdd31544ada83185554c87c7aa141064020f0) --- activemq-client/pom.xml| 11 ++ .../org/apache/activemq/openwire/OpenWireUtil.java | 32 .../openwire/v1/BaseDataStreamMarshaller.java | 4 + .../openwire/v10/BaseDataStreamMarshaller.java | 4 + .../openwire/v11/BaseDataStreamMarshaller.java | 4 + .../openwire/v12/BaseDataStreamMarshaller.java | 4 + .../openwire/v9/BaseDataStreamMarshaller.java | 4 + .../activemq/openwire/OpenWireValidationTest.java | 166 + activemq-openwire-legacy/pom.xml | 12 ++ .../openwire/v2/BaseDataStreamMarshaller.java | 4 + .../openwire/v3/BaseDataStreamMarshaller.java | 4 + .../openwire/v4/BaseDataStreamMarshaller.java | 4 + .../openwire/v5/BaseDataStreamMarshaller.java | 4 + .../openwire/v6/BaseDataStreamMarshaller.java | 4 + .../openwire/v7/BaseDataStreamMarshaller.java | 4 + .../openwire/v8/BaseDataStreamMarshaller.java | 4 + .../openwire/OpenWireLegacyValidationTest.java | 129 pom.xml| 7 + 18 files changed, 405 insertions(+) diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index ba5de326a..52e1ac1e0 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -268,6 +268,17 @@ + + +maven-jar-plugin + + + + test-jar + + + + diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java new file mode 100644 index 0..f52e6c3e0 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire; + +public class OpenWireUtil { + +/** + * Verify that the provided class extends {@link Throwable} and throw an + * {@link IllegalArgumentException} if it does not. + * + * @param clazz + */ +public static void validateIsThrowable(Class clazz) { +if (!Throwable.class.isAssignableFrom(clazz)) { +throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); +} +} +} diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java index b2c997ea7..cfe2716a9 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java @@ -25,6 +25,7 @@ import org.apache.activemq.command.DataStructure; import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.DataStreamMarshaller; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireUtil; import org.apache.activemq.util.ByteSequence; public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { @@ -229,8 +230,11 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String mess
[activemq] branch activemq-5.17.x updated: AMQ-9370 - Openwire marshaller should validate Throwable class type
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new d0ccdd315 AMQ-9370 - Openwire marshaller should validate Throwable class type d0ccdd315 is described below commit d0ccdd31544ada83185554c87c7aa141064020f0 Author: Christopher L. Shannon (cshannon) AuthorDate: Mon Oct 23 18:24:38 2023 -0400 AMQ-9370 - Openwire marshaller should validate Throwable class type (cherry picked from commit 3eaf3107f4fb9a3ce7ab45c175bfaeac7e866d5b) --- activemq-client/pom.xml| 11 ++ .../org/apache/activemq/openwire/OpenWireUtil.java | 32 .../openwire/v1/BaseDataStreamMarshaller.java | 4 + .../openwire/v10/BaseDataStreamMarshaller.java | 4 + .../openwire/v11/BaseDataStreamMarshaller.java | 4 + .../openwire/v12/BaseDataStreamMarshaller.java | 4 + .../openwire/v9/BaseDataStreamMarshaller.java | 4 + .../activemq/openwire/OpenWireValidationTest.java | 166 + activemq-openwire-legacy/pom.xml | 12 ++ .../openwire/v2/BaseDataStreamMarshaller.java | 4 + .../openwire/v3/BaseDataStreamMarshaller.java | 4 + .../openwire/v4/BaseDataStreamMarshaller.java | 4 + .../openwire/v5/BaseDataStreamMarshaller.java | 4 + .../openwire/v6/BaseDataStreamMarshaller.java | 4 + .../openwire/v7/BaseDataStreamMarshaller.java | 4 + .../openwire/v8/BaseDataStreamMarshaller.java | 4 + .../openwire/OpenWireLegacyValidationTest.java | 129 pom.xml| 7 + 18 files changed, 405 insertions(+) diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index 393e115a4..1753820fa 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -273,6 +273,17 @@ + + +maven-jar-plugin + + + + test-jar + + + + diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java new file mode 100644 index 0..f52e6c3e0 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire; + +public class OpenWireUtil { + +/** + * Verify that the provided class extends {@link Throwable} and throw an + * {@link IllegalArgumentException} if it does not. + * + * @param clazz + */ +public static void validateIsThrowable(Class clazz) { +if (!Throwable.class.isAssignableFrom(clazz)) { +throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); +} +} +} diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java index 94ba66d40..a96f8e841 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java @@ -25,6 +25,7 @@ import org.apache.activemq.command.DataStructure; import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.DataStreamMarshaller; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireUtil; import org.apache.activemq.util.ByteSequence; public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { @@ -229,8 +230,11 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { Clas
[activemq] branch activemq-5.18.x updated: AMQ-9370 - Openwire marshaller should validate Throwable class type
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 958330df2 AMQ-9370 - Openwire marshaller should validate Throwable class type 958330df2 is described below commit 958330df26cf3d5cdb63905dc2c6882e98781d8f Author: Christopher L. Shannon (cshannon) AuthorDate: Mon Oct 23 18:24:38 2023 -0400 AMQ-9370 - Openwire marshaller should validate Throwable class type (cherry picked from commit 3eaf3107f4fb9a3ce7ab45c175bfaeac7e866d5b) --- activemq-client/pom.xml| 11 ++ .../org/apache/activemq/openwire/OpenWireUtil.java | 32 .../openwire/v1/BaseDataStreamMarshaller.java | 4 + .../openwire/v10/BaseDataStreamMarshaller.java | 4 + .../openwire/v11/BaseDataStreamMarshaller.java | 4 + .../openwire/v12/BaseDataStreamMarshaller.java | 4 + .../openwire/v9/BaseDataStreamMarshaller.java | 4 + .../activemq/openwire/OpenWireValidationTest.java | 166 + activemq-openwire-legacy/pom.xml | 12 ++ .../openwire/v2/BaseDataStreamMarshaller.java | 4 + .../openwire/v3/BaseDataStreamMarshaller.java | 4 + .../openwire/v4/BaseDataStreamMarshaller.java | 4 + .../openwire/v5/BaseDataStreamMarshaller.java | 4 + .../openwire/v6/BaseDataStreamMarshaller.java | 4 + .../openwire/v7/BaseDataStreamMarshaller.java | 4 + .../openwire/v8/BaseDataStreamMarshaller.java | 4 + .../openwire/OpenWireLegacyValidationTest.java | 129 pom.xml| 7 + 18 files changed, 405 insertions(+) diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index 9befdb793..0a98833ea 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -268,6 +268,17 @@ + + +maven-jar-plugin + + + + test-jar + + + + diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java new file mode 100644 index 0..f52e6c3e0 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire; + +public class OpenWireUtil { + +/** + * Verify that the provided class extends {@link Throwable} and throw an + * {@link IllegalArgumentException} if it does not. + * + * @param clazz + */ +public static void validateIsThrowable(Class clazz) { +if (!Throwable.class.isAssignableFrom(clazz)) { +throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); +} +} +} diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java index 94ba66d40..a96f8e841 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java @@ -25,6 +25,7 @@ import org.apache.activemq.command.DataStructure; import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.DataStreamMarshaller; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireUtil; import org.apache.activemq.util.ByteSequence; public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { @@ -229,8 +230,11 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { Clas
[activemq] branch main updated: AMQ-9370 - Openwire marshaller should validate Throwable class type
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 3eaf3107f AMQ-9370 - Openwire marshaller should validate Throwable class type new 80089f9f4 Merge pull request #1098 from cshannon/openwire-throwable-fix 3eaf3107f is described below commit 3eaf3107f4fb9a3ce7ab45c175bfaeac7e866d5b Author: Christopher L. Shannon (cshannon) AuthorDate: Mon Oct 23 18:24:38 2023 -0400 AMQ-9370 - Openwire marshaller should validate Throwable class type --- activemq-client/pom.xml| 11 ++ .../org/apache/activemq/openwire/OpenWireUtil.java | 32 .../openwire/v1/BaseDataStreamMarshaller.java | 4 + .../openwire/v10/BaseDataStreamMarshaller.java | 4 + .../openwire/v11/BaseDataStreamMarshaller.java | 4 + .../openwire/v12/BaseDataStreamMarshaller.java | 4 + .../openwire/v9/BaseDataStreamMarshaller.java | 4 + .../activemq/openwire/OpenWireValidationTest.java | 166 + activemq-openwire-legacy/pom.xml | 12 ++ .../openwire/v2/BaseDataStreamMarshaller.java | 4 + .../openwire/v3/BaseDataStreamMarshaller.java | 4 + .../openwire/v4/BaseDataStreamMarshaller.java | 4 + .../openwire/v5/BaseDataStreamMarshaller.java | 4 + .../openwire/v6/BaseDataStreamMarshaller.java | 4 + .../openwire/v7/BaseDataStreamMarshaller.java | 4 + .../openwire/v8/BaseDataStreamMarshaller.java | 4 + .../openwire/OpenWireLegacyValidationTest.java | 129 pom.xml| 7 + 18 files changed, 405 insertions(+) diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index b71f9632d..2dd9df097 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -268,6 +268,17 @@ + + +maven-jar-plugin + + + + test-jar + + + + diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java new file mode 100644 index 0..f52e6c3e0 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire; + +public class OpenWireUtil { + +/** + * Verify that the provided class extends {@link Throwable} and throw an + * {@link IllegalArgumentException} if it does not. + * + * @param clazz + */ +public static void validateIsThrowable(Class clazz) { +if (!Throwable.class.isAssignableFrom(clazz)) { +throw new IllegalArgumentException("Class " + clazz + " is not assignable to Throwable"); +} +} +} diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java index 94ba66d40..a96f8e841 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java @@ -25,6 +25,7 @@ import org.apache.activemq.command.DataStructure; import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.DataStreamMarshaller; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireUtil; import org.apache.activemq.util.ByteSequence; public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { @@ -229,8 +230,11 @@ public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { private Throwable createThrowable(String className, String message) { try { Class clazz = Class.forName(className, false, BaseD
[activemq] branch activemq-5.17.x updated: AMQ-9343 - Reduce memory used for in flight transactions
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new 8204181db AMQ-9343 - Reduce memory used for in flight transactions 8204181db is described below commit 8204181db2ca39e5c945fae2e285d751ba58cc97 Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Oct 18 10:35:20 2023 -0400 AMQ-9343 - Reduce memory used for in flight transactions This commit will reduce the memory required in KahaDB for long running transactions and transactions with a lot of pending message sends by clearing out the message memory when no longer needed instead of keeping it tracked in the pending map (cherry picked from commit a26bf256d17135b5baaff0df64fc5cb51bad1cf8) --- .../apache/activemq/store/kahadb/KahaDBStore.java | 23 .../kahadb/KahaDBInFlightTxMemoryUsageTest.java| 142 + 2 files changed, 165 insertions(+) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 06dd0966b..bde01d040 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -572,6 +572,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } } }, null); + +/* + * After we store the command in the journal we no longer need to keep the message + * on the command, and we can clear the field here. + * + * The reason to clear the message is that for messages added as part of a transaction the command + * will be added to the inflightTransactions map as a pending add operation. + * For long-running transactions and/or transactions with a lot of pending messages + * (or large messages) this can use up a decent amount of memory which can increase GC pressure. + * + * The commands are only tracked in the map so that the KahaDB index can be updated later + * on transaction commit, but updating the index only requires metadata from the command + * such as message id or destination and not the message itself, so we can safely clear the field. + * + * Note that on broker restart and recovery of the KahaDB journal the pending message + * adds for transactions will be loaded again and the memory won't be cleared in that case. + * This could be revisited in the future if an issue but that should not be a large + * issue because that's only done on first startup and during recovery and then + * after the broker is recovered the memory footprint will drop. Also, as of now, recovering + * XA transactions in the transaction broker requires loading the messages and acks anyway + * for processing, so we need to load the full message and keep it in the pending operation. + */ +command.clearMessage(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java new file mode 100644 index 0..b8ca94d38 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb; + +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.lang.reflect.Field; +import j
[activemq] branch activemq-5.18.x updated: AMQ-9343 - Reduce memory used for in flight transactions
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new a26bf256d AMQ-9343 - Reduce memory used for in flight transactions a26bf256d is described below commit a26bf256d17135b5baaff0df64fc5cb51bad1cf8 Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Oct 18 10:35:20 2023 -0400 AMQ-9343 - Reduce memory used for in flight transactions This commit will reduce the memory required in KahaDB for long running transactions and transactions with a lot of pending message sends by clearing out the message memory when no longer needed instead of keeping it tracked in the pending map --- .../apache/activemq/store/kahadb/KahaDBStore.java | 23 .../kahadb/KahaDBInFlightTxMemoryUsageTest.java| 142 + 2 files changed, 165 insertions(+) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 06dd0966b..bde01d040 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -572,6 +572,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } } }, null); + +/* + * After we store the command in the journal we no longer need to keep the message + * on the command, and we can clear the field here. + * + * The reason to clear the message is that for messages added as part of a transaction the command + * will be added to the inflightTransactions map as a pending add operation. + * For long-running transactions and/or transactions with a lot of pending messages + * (or large messages) this can use up a decent amount of memory which can increase GC pressure. + * + * The commands are only tracked in the map so that the KahaDB index can be updated later + * on transaction commit, but updating the index only requires metadata from the command + * such as message id or destination and not the message itself, so we can safely clear the field. + * + * Note that on broker restart and recovery of the KahaDB journal the pending message + * adds for transactions will be loaded again and the memory won't be cleared in that case. + * This could be revisited in the future if an issue but that should not be a large + * issue because that's only done on first startup and during recovery and then + * after the broker is recovered the memory footprint will drop. Also, as of now, recovering + * XA transactions in the transaction broker requires loading the messages and acks anyway + * for processing, so we need to load the full message and keep it in the pending operation. + */ +command.clearMessage(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java new file mode 100644 index 0..b8ca94d38 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb; + +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.LinkedHashMap; +import java.util.List; +import ja
[activemq] branch main updated: AMQ-9343 - Reduce memory used for in flight transactions
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new c3bef84be AMQ-9343 - Reduce memory used for in flight transactions new 961067ec1 Merge pull request #1075 from cshannon/AMQ-9343 c3bef84be is described below commit c3bef84be5717d91e81b196fac23800097737c5a Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Oct 18 10:35:20 2023 -0400 AMQ-9343 - Reduce memory used for in flight transactions This commit will reduce the memory required in KahaDB for long running transactions and transactions with a lot of pending message sends by clearing out the message memory when no longer needed instead of keeping it tracked in the pending map --- .../apache/activemq/store/kahadb/KahaDBStore.java | 23 .../kahadb/KahaDBInFlightTxMemoryUsageTest.java| 142 + 2 files changed, 165 insertions(+) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 06dd0966b..bde01d040 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -572,6 +572,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } } }, null); + +/* + * After we store the command in the journal we no longer need to keep the message + * on the command, and we can clear the field here. + * + * The reason to clear the message is that for messages added as part of a transaction the command + * will be added to the inflightTransactions map as a pending add operation. + * For long-running transactions and/or transactions with a lot of pending messages + * (or large messages) this can use up a decent amount of memory which can increase GC pressure. + * + * The commands are only tracked in the map so that the KahaDB index can be updated later + * on transaction commit, but updating the index only requires metadata from the command + * such as message id or destination and not the message itself, so we can safely clear the field. + * + * Note that on broker restart and recovery of the KahaDB journal the pending message + * adds for transactions will be loaded again and the memory won't be cleared in that case. + * This could be revisited in the future if an issue but that should not be a large + * issue because that's only done on first startup and during recovery and then + * after the broker is recovered the memory footprint will drop. Also, as of now, recovering + * XA transactions in the transaction broker requires loading the messages and acks anyway + * for processing, so we need to load the full message and keep it in the pending operation. + */ +command.clearMessage(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java new file mode 100644 index 0..ba9ecc5b7 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb; + +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Li
[activemq] branch activemq-5.18.x updated: AMQ-9255 - fix jms package imports
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new edba2484d AMQ-9255 - fix jms package imports edba2484d is described below commit edba2484dedd49b5f2f858aa343a0e273da594a1 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue Oct 3 08:04:28 2023 -0400 AMQ-9255 - fix jms package imports --- .../java/org/apache/activemq/bugs/AMQ9255Test.java | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java index ad0f9f356..bdea6ad9a 100644 --- a/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java +++ b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java @@ -16,13 +16,16 @@ */ package org.apache.activemq.bugs; -import jakarta.jms.Connection; -import jakarta.jms.DeliveryMode; -import jakarta.jms.MessageConsumer; -import jakarta.jms.MessageProducer; -import jakarta.jms.Session; -import jakarta.jms.TextMessage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Connection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.transport.http.WaitForJettyListener; @@ -34,9 +37,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - public class AMQ9255Test { private static final Logger LOG = LoggerFactory.getLogger(AMQ9255Test.class);
[activemq] branch activemq-5.18.x updated: [AMQ-9255] Initialize the transient field of the class Message (#1051)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 087d6aed5 [AMQ-9255] Initialize the transient field of the class Message (#1051) 087d6aed5 is described below commit 087d6aed5c0282330c5b64764fdbe5b29d9d1f6d Author: Nicolas Filotto AuthorDate: Tue Oct 3 13:54:40 2023 +0200 [AMQ-9255] Initialize the transient field of the class Message (#1051) (cherry picked from commit 538b04aa0c18f61cd47b261c7372a3e559c2ca0e) --- .../activemq/command/ActiveMQMapMessage.java | 4 +- .../java/org/apache/activemq/command/Message.java | 13 ++ .../java/org/apache/activemq/bugs/AMQ9255Test.java | 150 + 3 files changed, 166 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java index 5619e87e8..a00e86803 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java @@ -104,7 +104,9 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage { protected transient Map map = new HashMap(); -private Object readResolve() throws ObjectStreamException { +@Override +protected Object readResolve() throws ObjectStreamException { +super.readResolve(); if (this.map == null) { this.map = new HashMap(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index e74e1f39a..851696254 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -20,6 +20,7 @@ import java.beans.Transient; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.ObjectStreamException; import java.io.OutputStream; import java.util.Collections; import java.util.HashMap; @@ -858,4 +859,16 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess public boolean canProcessAsExpired() { return processAsExpired.compareAndSet(false, true); } + +/** + * Initialize the transient fields at deserialization to get a normal state. + * + * @see https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/Serializable.html;>Serializable Javadoc + */ +protected Object readResolve() throws ObjectStreamException { +if (this.processAsExpired == null) { +this.processAsExpired = new AtomicBoolean(); +} +return this; +} } diff --git a/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java new file mode 100644 index 0..ad0f9f356 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.http.WaitForJettyListener; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ9255Test { + +private static final Logger LOG = LoggerFactory.getLogger(AMQ9255Test.class); + +@Rule +pub
[activemq] branch main updated: [AMQ-9255] Initialize the transient field of the class Message (#1051)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 538b04aa0 [AMQ-9255] Initialize the transient field of the class Message (#1051) 538b04aa0 is described below commit 538b04aa0c18f61cd47b261c7372a3e559c2ca0e Author: Nicolas Filotto AuthorDate: Tue Oct 3 13:54:40 2023 +0200 [AMQ-9255] Initialize the transient field of the class Message (#1051) --- .../activemq/command/ActiveMQMapMessage.java | 4 +- .../java/org/apache/activemq/command/Message.java | 13 ++ .../java/org/apache/activemq/bugs/AMQ9255Test.java | 150 + 3 files changed, 166 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java index 8384c3473..1bd6e5362 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java @@ -104,7 +104,9 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage { protected transient Map map = new HashMap(); -private Object readResolve() throws ObjectStreamException { +@Override +protected Object readResolve() throws ObjectStreamException { +super.readResolve(); if (this.map == null) { this.map = new HashMap(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index 88e9787cb..2a31047c9 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -20,6 +20,7 @@ import java.beans.Transient; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.ObjectStreamException; import java.io.OutputStream; import java.util.Collections; import java.util.HashMap; @@ -858,4 +859,16 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess public boolean canProcessAsExpired() { return processAsExpired.compareAndSet(false, true); } + +/** + * Initialize the transient fields at deserialization to get a normal state. + * + * @see https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/Serializable.html;>Serializable Javadoc + */ +protected Object readResolve() throws ObjectStreamException { +if (this.processAsExpired == null) { +this.processAsExpired = new AtomicBoolean(); +} +return this; +} } diff --git a/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java new file mode 100644 index 0..ad0f9f356 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.http.WaitForJettyListener; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ9255Test { + +private static final Logger LOG = LoggerFactory.getLogger(AMQ9255Test.class); + +@Rule +public TestName name = new TestName(); +private BrokerService broker; +private ActiveMQConnectionFa
[activemq] 02/02: [AMQ-9258] Update kahadb corruption test to account for new fix from AMQ-9254 (#1007)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 697feeccb0a14caa464e09adcde43fd58b30bc55 Author: Matt Pavlovich AuthorDate: Thu May 18 11:29:01 2023 -0500 [AMQ-9258] Update kahadb corruption test to account for new fix from AMQ-9254 (#1007) (cherry picked from commit cfbea60d6d4f934e7fbe85915183a2f211414b82) --- .../store/kahadb/JournalCorruptionEofIndexRecoveryTest.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index da89e5fd83..d12474523a 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -239,14 +240,20 @@ public class JournalCorruptionEofIndexRecoveryTest { final var appender = new AbstractAppender("testAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { @Override public void append(LogEvent event) { +/** + * NOTE: As of JDK v11.0.19 RandomAccessFile throws a messageless EOFException when read fails + * + * throw new EOFException(); + */ if (event != null && event.getLevel() == Level.WARN && event.getMessage() != null && event.getMessage().getFormattedMessage() != null && event.getMessage().getFormattedMessage().contains("Cannot recover message audit") && event.getThrown() != null -&& event.getThrown().getLocalizedMessage() != null -&& event.getThrown().getLocalizedMessage().contains("Invalid location size")) { +&& event.getThrown() instanceof EOFException +&& event.getThrown().getMessage() == null) { + trappedExpectedLogMessage.set(true); } } @@ -263,6 +270,8 @@ public class JournalCorruptionEofIndexRecoveryTest { } assertEquals("no missing message", 50, broker.getAdminView().getTotalMessageCount()); +assertEquals("Drain", 50, drainQueue(50)); +assertEquals("no problem draining messages", 0, broker.getAdminView().getTotalMessageCount()); assertTrue("Did replay records on invalid location size", trappedExpectedLogMessage.get()); }
[activemq] branch activemq-5.17.x updated (53f2b4a0cb -> 697feeccb0)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git from 53f2b4a0cb [maven-release-plugin] prepare for next development iteration new 3d21425d37 [AMQ-9258] Add NPE guards to kahadb test that intermittently fails on slower CI servers (#1006) new 697feeccb0 [AMQ-9258] Update kahadb corruption test to account for new fix from AMQ-9254 (#1007) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../kahadb/JournalCorruptionEofIndexRecoveryTest.java | 18 -- 1 file changed, 16 insertions(+), 2 deletions(-)
[activemq] 01/02: [AMQ-9258] Add NPE guards to kahadb test that intermittently fails on slower CI servers (#1006)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 3d21425d37c9dd997493676e24577f49c31c Author: Matt Pavlovich AuthorDate: Wed May 17 10:27:31 2023 -0500 [AMQ-9258] Add NPE guards to kahadb test that intermittently fails on slower CI servers (#1006) (cherry picked from commit 0a042964c8e0d8c5f694728ded6d278810756bc7) --- .../store/kahadb/JournalCorruptionEofIndexRecoveryTest.java| 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 0033e0b2c7..da89e5fd83 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -239,8 +239,13 @@ public class JournalCorruptionEofIndexRecoveryTest { final var appender = new AbstractAppender("testAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { @Override public void append(LogEvent event) { -if (event.getLevel() == Level.WARN +if (event != null +&& event.getLevel() == Level.WARN +&& event.getMessage() != null +&& event.getMessage().getFormattedMessage() != null && event.getMessage().getFormattedMessage().contains("Cannot recover message audit") +&& event.getThrown() != null +&& event.getThrown().getLocalizedMessage() != null && event.getThrown().getLocalizedMessage().contains("Invalid location size")) { trappedExpectedLogMessage.set(true); }
[activemq] branch activemq-5.18.x updated (20ed8ba70f -> fa72c13612)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git from 20ed8ba70f [maven-release-plugin] prepare for next development iteration new a62add45e7 [AMQ-9258] Add NPE guards to kahadb test that intermittently fails on slower CI servers (#1006) new fa72c13612 [AMQ-9258] Update kahadb corruption test to account for new fix from AMQ-9254 (#1007) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../kahadb/JournalCorruptionEofIndexRecoveryTest.java | 18 -- 1 file changed, 16 insertions(+), 2 deletions(-)
[activemq] 02/02: [AMQ-9258] Update kahadb corruption test to account for new fix from AMQ-9254 (#1007)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit fa72c13612e816603b42245de54b4d4dc8fc3dcd Author: Matt Pavlovich AuthorDate: Thu May 18 11:29:01 2023 -0500 [AMQ-9258] Update kahadb corruption test to account for new fix from AMQ-9254 (#1007) (cherry picked from commit cfbea60d6d4f934e7fbe85915183a2f211414b82) --- .../store/kahadb/JournalCorruptionEofIndexRecoveryTest.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index da89e5fd83..d12474523a 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -239,14 +240,20 @@ public class JournalCorruptionEofIndexRecoveryTest { final var appender = new AbstractAppender("testAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { @Override public void append(LogEvent event) { +/** + * NOTE: As of JDK v11.0.19 RandomAccessFile throws a messageless EOFException when read fails + * + * throw new EOFException(); + */ if (event != null && event.getLevel() == Level.WARN && event.getMessage() != null && event.getMessage().getFormattedMessage() != null && event.getMessage().getFormattedMessage().contains("Cannot recover message audit") && event.getThrown() != null -&& event.getThrown().getLocalizedMessage() != null -&& event.getThrown().getLocalizedMessage().contains("Invalid location size")) { +&& event.getThrown() instanceof EOFException +&& event.getThrown().getMessage() == null) { + trappedExpectedLogMessage.set(true); } } @@ -263,6 +270,8 @@ public class JournalCorruptionEofIndexRecoveryTest { } assertEquals("no missing message", 50, broker.getAdminView().getTotalMessageCount()); +assertEquals("Drain", 50, drainQueue(50)); +assertEquals("no problem draining messages", 0, broker.getAdminView().getTotalMessageCount()); assertTrue("Did replay records on invalid location size", trappedExpectedLogMessage.get()); }
[activemq] 01/02: [AMQ-9258] Add NPE guards to kahadb test that intermittently fails on slower CI servers (#1006)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit a62add45e7276bb767cd19082c7dd38404c117b6 Author: Matt Pavlovich AuthorDate: Wed May 17 10:27:31 2023 -0500 [AMQ-9258] Add NPE guards to kahadb test that intermittently fails on slower CI servers (#1006) (cherry picked from commit 0a042964c8e0d8c5f694728ded6d278810756bc7) --- .../store/kahadb/JournalCorruptionEofIndexRecoveryTest.java| 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 0033e0b2c7..da89e5fd83 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -239,8 +239,13 @@ public class JournalCorruptionEofIndexRecoveryTest { final var appender = new AbstractAppender("testAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { @Override public void append(LogEvent event) { -if (event.getLevel() == Level.WARN +if (event != null +&& event.getLevel() == Level.WARN +&& event.getMessage() != null +&& event.getMessage().getFormattedMessage() != null && event.getMessage().getFormattedMessage().contains("Cannot recover message audit") +&& event.getThrown() != null +&& event.getThrown().getLocalizedMessage() != null && event.getThrown().getLocalizedMessage().contains("Invalid location size")) { trappedExpectedLogMessage.set(true); }
[activemq] branch activemq-5.17.x updated: AMQ-9262 - Fix network subscriptions for composite consumers (#1014)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new 1707cc194b AMQ-9262 - Fix network subscriptions for composite consumers (#1014) 1707cc194b is described below commit 1707cc194b1c74808c67ff54db8b909f8ef4b990 Author: Christopher L. Shannon AuthorDate: Wed Jun 7 07:18:18 2023 -0400 AMQ-9262 - Fix network subscriptions for composite consumers (#1014) This fixes network subscriptions that are generated on demand when a consumer uses composite destinations. Before this fix conduit subscriptions didn't work correctly. This fix now splits up the composite dest and generates correct demand for each of the individual destinations. (cherry picked from commit 901956d4ddb6a0ea9fe5fedf39732117ab68f087) --- .../network/DemandForwardingBridgeSupport.java | 99 - .../CompositeConsumerNetworkBridgeTest.java| 435 + .../network/DurableSyncNetworkBridgeTest.java | 15 +- .../network/DynamicNetworkTestSupport.java | 61 ++- .../network/ForceDurableNetworkBridgeTest.java | 9 +- 5 files changed, 567 insertions(+), 52 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 28d136fe84..57afc85d11 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentMap subscriptionMapByLocalId = new ConcurrentHashMap<>(); protected final ConcurrentMap subscriptionMapByRemoteId = new ConcurrentHashMap<>(); -protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap()); +protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<>()); +protected final ConcurrentMap> compositeConsumerIds = new ConcurrentHashMap<>(); +protected final ConcurrentMap> compositeSubscriptions = new ConcurrentHashMap<>(); protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch localStartedLatch = new CountDownLatch(1); @@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); + +// If we have an entry in compositeConsumerIds then this consumer was a +// composite consumer and we need to remove the entries in the set and +// not the consumer id we received here +final Set compositeIds = compositeConsumerIds.remove(id); +if (compositeIds != null) { +for (ConsumerId compositeId : compositeIds) { +serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId)); +} +return; +} + removeDemandSubscription(id); if (forcedDurableRemoteId.remove(id)) { @@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveSubscriptionInfo.class) { final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + +// If we have an entry in compositeSubscriptions then this consumer was a +// composite consumer and we need to remove the entries in the set and not +// the subscription that we received here +final Set compositeSubs = +this.compositeSubscriptions.remove(subscriptionInfo); +if (compositeSubs != null) { +for (SubscriptionInfo compositeSub : compositeSubs) { +RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo(); +remove.setClientId(compositeSub.getClientId()); + remove.setSubscriptionN
[activemq] branch activemq-5.18.x updated: AMQ-9262 - Fix network subscriptions for composite consumers (#1014)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new 04cb6585bd AMQ-9262 - Fix network subscriptions for composite consumers (#1014) 04cb6585bd is described below commit 04cb6585bde51818eff089a2823b20e9e4a62991 Author: Christopher L. Shannon AuthorDate: Wed Jun 7 07:18:18 2023 -0400 AMQ-9262 - Fix network subscriptions for composite consumers (#1014) This fixes network subscriptions that are generated on demand when a consumer uses composite destinations. Before this fix conduit subscriptions didn't work correctly. This fix now splits up the composite dest and generates correct demand for each of the individual destinations. (cherry picked from commit 901956d4ddb6a0ea9fe5fedf39732117ab68f087) --- .../network/DemandForwardingBridgeSupport.java | 99 - .../CompositeConsumerNetworkBridgeTest.java| 435 + .../network/DurableSyncNetworkBridgeTest.java | 15 +- .../network/DynamicNetworkTestSupport.java | 61 ++- .../network/ForceDurableNetworkBridgeTest.java | 9 +- 5 files changed, 567 insertions(+), 52 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 28d136fe84..57afc85d11 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentMap subscriptionMapByLocalId = new ConcurrentHashMap<>(); protected final ConcurrentMap subscriptionMapByRemoteId = new ConcurrentHashMap<>(); -protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap()); +protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<>()); +protected final ConcurrentMap> compositeConsumerIds = new ConcurrentHashMap<>(); +protected final ConcurrentMap> compositeSubscriptions = new ConcurrentHashMap<>(); protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch localStartedLatch = new CountDownLatch(1); @@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); + +// If we have an entry in compositeConsumerIds then this consumer was a +// composite consumer and we need to remove the entries in the set and +// not the consumer id we received here +final Set compositeIds = compositeConsumerIds.remove(id); +if (compositeIds != null) { +for (ConsumerId compositeId : compositeIds) { +serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId)); +} +return; +} + removeDemandSubscription(id); if (forcedDurableRemoteId.remove(id)) { @@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveSubscriptionInfo.class) { final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + +// If we have an entry in compositeSubscriptions then this consumer was a +// composite consumer and we need to remove the entries in the set and not +// the subscription that we received here +final Set compositeSubs = +this.compositeSubscriptions.remove(subscriptionInfo); +if (compositeSubs != null) { +for (SubscriptionInfo compositeSub : compositeSubs) { +RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo(); +remove.setClientId(compositeSub.getClientId()); + remove.setSubscriptionN
[activemq] branch main updated: AMQ-9262 - Fix network subscriptions for composite consumers (#1014)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 901956d4dd AMQ-9262 - Fix network subscriptions for composite consumers (#1014) 901956d4dd is described below commit 901956d4ddb6a0ea9fe5fedf39732117ab68f087 Author: Christopher L. Shannon AuthorDate: Wed Jun 7 07:18:18 2023 -0400 AMQ-9262 - Fix network subscriptions for composite consumers (#1014) This fixes network subscriptions that are generated on demand when a consumer uses composite destinations. Before this fix conduit subscriptions didn't work correctly. This fix now splits up the composite dest and generates correct demand for each of the individual destinations. --- .../network/DemandForwardingBridgeSupport.java | 99 - .../CompositeConsumerNetworkBridgeTest.java| 435 + .../network/DurableSyncNetworkBridgeTest.java | 15 +- .../network/DynamicNetworkTestSupport.java | 61 ++- .../network/ForceDurableNetworkBridgeTest.java | 9 +- 5 files changed, 567 insertions(+), 52 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 28d136fe84..57afc85d11 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentMap subscriptionMapByLocalId = new ConcurrentHashMap<>(); protected final ConcurrentMap subscriptionMapByRemoteId = new ConcurrentHashMap<>(); -protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap()); +protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<>()); +protected final ConcurrentMap> compositeConsumerIds = new ConcurrentHashMap<>(); +protected final ConcurrentMap> compositeSubscriptions = new ConcurrentHashMap<>(); protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch localStartedLatch = new CountDownLatch(1); @@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); + +// If we have an entry in compositeConsumerIds then this consumer was a +// composite consumer and we need to remove the entries in the set and +// not the consumer id we received here +final Set compositeIds = compositeConsumerIds.remove(id); +if (compositeIds != null) { +for (ConsumerId compositeId : compositeIds) { +serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId)); +} +return; +} + removeDemandSubscription(id); if (forcedDurableRemoteId.remove(id)) { @@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveSubscriptionInfo.class) { final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + +// If we have an entry in compositeSubscriptions then this consumer was a +// composite consumer and we need to remove the entries in the set and not +// the subscription that we received here +final Set compositeSubs = +this.compositeSubscriptions.remove(subscriptionInfo); +if (compositeSubs != null) { +for (SubscriptionInfo compositeSub : compositeSubs) { +RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo(); +remove.setClientId(compositeSub.getClientId()); + remove.setSubscriptionName(compositeSub.getSubscriptionName()); + remove.setConnectionId(this
[activemq] branch main updated: [AMQ-9259] Remove activemq-partition and zookeeper test dependency
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 67f5afa0f4 [AMQ-9259] Remove activemq-partition and zookeeper test dependency new 53f9390c41 Merge pull request #1009 from mattrpav/AMQ-9259 67f5afa0f4 is described below commit 67f5afa0f44a0c21992f81e5262970a8e14e172a Author: Matt Pavlovich AuthorDate: Mon May 22 10:59:41 2023 -0500 [AMQ-9259] Remove activemq-partition and zookeeper test dependency --- activemq-osgi/pom.xml | 5 - activemq-partition/pom.xml | 149 -- .../apache/activemq/partition/PartitionBroker.java | 367 - .../activemq/partition/PartitionBrokerPlugin.java | 66 --- .../org/apache/activemq/partition/ZKClient.java| 596 - .../partition/ZooKeeperPartitionBroker.java| 124 - .../partition/ZooKeeperPartitionBrokerPlugin.java | 68 --- .../activemq/partition/dto/Partitioning.java | 161 -- .../org/apache/activemq/partition/dto/Target.java | 59 -- .../activemq/partition/PartitionBrokerTest.java| 251 - .../partition/ZooKeeperPartitionBrokerTest.java| 97 activemq-spring/pom.xml| 16 - activemq-unit-tests/pom.xml| 4 - .../partition/SpringPartitionBrokerTest.java | 53 -- .../src/test/resources/activemq-partition.xml | 58 -- assembly/pom.xml | 17 - assembly/src/main/descriptors/common-bin.xml | 1 - pom.xml| 67 --- 18 files changed, 2159 deletions(-) diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index 03d29cd7a0..7b99b34119 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -68,10 +68,6 @@ ${project.groupId} activemq-http - - ${project.groupId} - activemq-partition - @@ -187,7 +183,6 @@ org.codehaus.jettison*;resolution:=optional, org.jasypt*;resolution:=optional, org.eclipse.jetty*;resolution:=optional;version="[9.0,10)", - org.apache.zookeeper*;resolution:=optional, org.fusesource.hawtjni*;resolution:=optional, org.springframework.jms*;version="[4,6)";resolution:=optional, org.springframework.transaction*;version="[4,6)";resolution:=optional, diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml deleted file mode 100644 index 901c869d3a..00 --- a/activemq-partition/pom.xml +++ /dev/null @@ -1,149 +0,0 @@ - - -http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> - - 4.0.0 - - -org.apache.activemq -activemq-parent -5.19.0-SNAPSHOT - - - activemq-partition - jar - - ActiveMQ :: Partition Management - Used to partition clients over a cluster of brokers - - - - org.apache.activemq - activemq-broker - provided - - - - org.slf4j - slf4j-api - compile - - - - org.linkedin - org.linkedin.zookeeper-impl - - - org.linkedin - org.linkedin.util-core - - - org.apache.zookeeper - zookeeper - - - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-databind - - - - - org.apache.logging.log4j - log4j-core - test - - - org.apache.logging.log4j - log4j-slf4j2-impl - test - - - - org.apache.activemq - activemq-broker - test-jar - test - - - - junit - junit - test - - - - - - - - - - activemq.tests-sanity - - - activemq.tests - smoke - - - - - -maven-surefire-plugin - - -**/PartitionBrokerTest.* - - - - - - - - activemq.tests-autoTransport - - - activemq.tests - autoTransport - - - - - -maven-surefire-plugin - - -** - - - - - - - - - diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java deleted file mode 100644 index 9362e64b26..000
[activemq] 02/02: AMQ-9254 - Rework data file size validation and add unit test
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 6a64a0817e93991515eb89ee09ddc468452bd1c3 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue May 2 19:09:23 2023 -0400 AMQ-9254 - Rework data file size validation and add unit test This isolates the validation on data file length on read and adds unit tests to verify we properly fallback to the real file length on initial size check failure (cherry picked from commit bcc74f93fe6dbbd5c795c35484db8efa29b254b6) --- activemq-kahadb-store/pom.xml | 5 ++ .../kahadb/disk/journal/DataFileAccessor.java | 37 + .../kahadb/disk/journal/DataFileAccessorTest.java | 96 ++ 3 files changed, 123 insertions(+), 15 deletions(-) diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml index 85d816810c..3ac84901eb 100644 --- a/activemq-kahadb-store/pom.xml +++ b/activemq-kahadb-store/pom.xml @@ -136,6 +136,11 @@ jmock-legacy test + + org.mockito + mockito-core + test + diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index c9671f2820..dbe151d507 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; import java.util.Map; +import java.util.Objects; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; @@ -75,7 +76,6 @@ final class DataFileAccessor { } try { - if (location.getSize() == Location.NOT_SET) { file.seek(location.getOffset()); location.setSize(file.readInt()); @@ -83,19 +83,7 @@ final class DataFileAccessor { } else { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); } -if ((long)location.getOffset() + location.getSize() > dataFile.length) { -/** - * AMQ-9254 if the read request is outside expected dataFile length, - * perform expensive OS file length lookup operation - * to allow read operation if it will succeed - */ -long osFileLength = dataFile.getFile().length(); -if((long)location.getOffset() + location.getSize() > osFileLength) { -throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); -} else { -LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}", dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize()); -} -} +validateFileLength(location); byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; file.readFully(data); return new ByteSequence(data, 0, data.length); @@ -127,7 +115,6 @@ final class DataFileAccessor { } } - public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); @@ -141,4 +128,24 @@ final class DataFileAccessor { public RecoverableRandomAccessFile getRaf() { return file; } + +void validateFileLength(final Location location) throws IOException { +final long recordEnd = location.getOffset() + location.getSize(); + +//Check if the end of the record will go past the file length +if (recordEnd > dataFile.length) { +/* + * AMQ-9254 if the read request is outside expected dataFile length, + * perform expensive OS file length lookup operation to allow read + * operation if it will succeed + */ +final long osFileLength = dataFile.getFile().length(); +if(recordEnd > osFileLength) { +throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); +} else { +LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}", +dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize()); +} +
[activemq] 01/02: [#9254] DataFile readRecord fallback to OS file.length in rare edge case
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 053589eb04db555dfed17a677c78b7ba21bc7191 Author: Matt Pavlovich AuthorDate: Mon May 1 20:08:16 2023 -0500 [#9254] DataFile readRecord fallback to OS file.length in rare edge case (cherry picked from commit 3e61a200d07c4f788542e3e0a434e3e7db25cd81) --- .../store/kahadb/disk/journal/DataFileAccessor.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 57df143b56..c9671f2820 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -17,7 +17,6 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.Map; import org.apache.activemq.util.ByteSequence; @@ -85,7 +84,17 @@ final class DataFileAccessor { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); } if ((long)location.getOffset() + location.getSize() > dataFile.length) { -throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); +/** + * AMQ-9254 if the read request is outside expected dataFile length, + * perform expensive OS file length lookup operation + * to allow read operation if it will succeed + */ +long osFileLength = dataFile.getFile().length(); +if((long)location.getOffset() + location.getSize() > osFileLength) { +throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); +} else { +LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}", dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize()); +} } byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; file.readFully(data);
[activemq] branch activemq-5.17.x updated (dd95b2d874 -> 6a64a0817e)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git from dd95b2d874 [AMQ-9231] Upgrade to Jetty 9.4.51.v20230217 new 053589eb04 [#9254] DataFile readRecord fallback to OS file.length in rare edge case new 6a64a0817e AMQ-9254 - Rework data file size validation and add unit test The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: activemq-kahadb-store/pom.xml | 5 ++ .../kahadb/disk/journal/DataFileAccessor.java | 28 +-- .../kahadb/disk/journal/DataFileAccessorTest.java | 96 ++ 3 files changed, 123 insertions(+), 6 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorTest.java
[activemq] branch activemq-5.18.x updated (add6356261 -> 78a399cb87)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git from add6356261 AMQ-9243 - Remove deprecated jetty-continuation dependency (#998) new cd676023cc [#9254] DataFile readRecord fallback to OS file.length in rare edge case new 78a399cb87 AMQ-9254 - Rework data file size validation and add unit test The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: activemq-kahadb-store/pom.xml | 5 ++ .../kahadb/disk/journal/DataFileAccessor.java | 28 +-- .../kahadb/disk/journal/DataFileAccessorTest.java | 96 ++ 3 files changed, 123 insertions(+), 6 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorTest.java
[activemq] 01/02: [#9254] DataFile readRecord fallback to OS file.length in rare edge case
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit cd676023cc1989bbe565d806a5d3df3267519a51 Author: Matt Pavlovich AuthorDate: Mon May 1 20:08:16 2023 -0500 [#9254] DataFile readRecord fallback to OS file.length in rare edge case (cherry picked from commit 3e61a200d07c4f788542e3e0a434e3e7db25cd81) --- .../store/kahadb/disk/journal/DataFileAccessor.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 57df143b56..c9671f2820 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -17,7 +17,6 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.Map; import org.apache.activemq.util.ByteSequence; @@ -85,7 +84,17 @@ final class DataFileAccessor { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); } if ((long)location.getOffset() + location.getSize() > dataFile.length) { -throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); +/** + * AMQ-9254 if the read request is outside expected dataFile length, + * perform expensive OS file length lookup operation + * to allow read operation if it will succeed + */ +long osFileLength = dataFile.getFile().length(); +if((long)location.getOffset() + location.getSize() > osFileLength) { +throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); +} else { +LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}", dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize()); +} } byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; file.readFully(data);
[activemq] 02/02: AMQ-9254 - Rework data file size validation and add unit test
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 78a399cb8714c6b7cd0bbcaa6be21fa217399033 Author: Christopher L. Shannon (cshannon) AuthorDate: Tue May 2 19:09:23 2023 -0400 AMQ-9254 - Rework data file size validation and add unit test This isolates the validation on data file length on read and adds unit tests to verify we properly fallback to the real file length on initial size check failure (cherry picked from commit bcc74f93fe6dbbd5c795c35484db8efa29b254b6) --- activemq-kahadb-store/pom.xml | 5 ++ .../kahadb/disk/journal/DataFileAccessor.java | 37 + .../kahadb/disk/journal/DataFileAccessorTest.java | 96 ++ 3 files changed, 123 insertions(+), 15 deletions(-) diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml index 2f7efe853f..9765a413a6 100644 --- a/activemq-kahadb-store/pom.xml +++ b/activemq-kahadb-store/pom.xml @@ -137,6 +137,11 @@ jmock-legacy test + + org.mockito + mockito-core + test + diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index c9671f2820..dbe151d507 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; import java.util.Map; +import java.util.Objects; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; @@ -75,7 +76,6 @@ final class DataFileAccessor { } try { - if (location.getSize() == Location.NOT_SET) { file.seek(location.getOffset()); location.setSize(file.readInt()); @@ -83,19 +83,7 @@ final class DataFileAccessor { } else { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); } -if ((long)location.getOffset() + location.getSize() > dataFile.length) { -/** - * AMQ-9254 if the read request is outside expected dataFile length, - * perform expensive OS file length lookup operation - * to allow read operation if it will succeed - */ -long osFileLength = dataFile.getFile().length(); -if((long)location.getOffset() + location.getSize() > osFileLength) { -throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); -} else { -LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}", dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize()); -} -} +validateFileLength(location); byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; file.readFully(data); return new ByteSequence(data, 0, data.length); @@ -127,7 +115,6 @@ final class DataFileAccessor { } } - public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); @@ -141,4 +128,24 @@ final class DataFileAccessor { public RecoverableRandomAccessFile getRaf() { return file; } + +void validateFileLength(final Location location) throws IOException { +final long recordEnd = location.getOffset() + location.getSize(); + +//Check if the end of the record will go past the file length +if (recordEnd > dataFile.length) { +/* + * AMQ-9254 if the read request is outside expected dataFile length, + * perform expensive OS file length lookup operation to allow read + * operation if it will succeed + */ +final long osFileLength = dataFile.getFile().length(); +if(recordEnd > osFileLength) { +throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); +} else { +LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}", +dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize()); +} +
[activemq] branch main updated (6b277a4903 -> b8b5dedb78)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 6b277a4903 [AMQ-9239] Host joram jms unit tests ahead of converting to jakarta.jms (#1002) new 3e61a200d0 [#9254] DataFile readRecord fallback to OS file.length in rare edge case new bcc74f93fe AMQ-9254 - Rework data file size validation and add unit test new b8b5dedb78 Merge pull request #1004 from mattrpav/AMQ-9254 The 11375 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: activemq-kahadb-store/pom.xml | 5 ++ .../kahadb/disk/journal/DataFileAccessor.java | 28 +-- .../kahadb/disk/journal/DataFileAccessorTest.java | 96 ++ 3 files changed, 123 insertions(+), 6 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorTest.java
[activemq] branch activemq-5.18.x updated: AMQ-9243 - Remove deprecated jetty-continuation dependency (#998)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new add635626 AMQ-9243 - Remove deprecated jetty-continuation dependency (#998) add635626 is described below commit add6356261d7b3fdc743293a9581a0b2f0c6b8a6 Author: Christopher L. Shannon AuthorDate: Thu Apr 13 07:49:22 2023 -0400 AMQ-9243 - Remove deprecated jetty-continuation dependency (#998) This commit removes the dependency on jetty-continuation and updates the MessageServlet and AjaxListener servlets to use the Servlet Async api directly for async requests through a new Async holder object that is used to track/manage the request. (cherry picked from commit 905f00c843b96996b25017e1b8646de15d703398) --- activemq-web-console/pom.xml | 4 - .../java/org/apache/activemq/web/RestTest.java | 34 + activemq-web/pom.xml | 5 - .../java/org/apache/activemq/web/AjaxListener.java | 26 ++-- .../activemq/web/MessageListenerServlet.java | 43 ++ .../org/apache/activemq/web/MessageServlet.java| 71 - .../activemq/web/async/AsyncServletRequest.java| 168 + 7 files changed, 261 insertions(+), 90 deletions(-) diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml index 21fa35dc8..c99df42de 100644 --- a/activemq-web-console/pom.xml +++ b/activemq-web-console/pom.xml @@ -211,10 +211,6 @@ org.eclipse.jetty.websocket websocket-server - - org.eclipse.jetty - jetty-continuation - diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 872f501b7..9319601e9 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -59,6 +59,40 @@ public class RestTest extends JettyTestSupport { assertEquals("test", buf.toString()); } +@Test(timeout = 60 * 1000) +public void testConsumeAsync() throws Exception { +int port = getPort(); +HttpClient httpClient = new HttpClient(); +httpClient.start(); + +final StringBuffer buf = new StringBuffer(); +final CountDownLatch latch = +asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=5000=queue", buf); + +//Sleep 2 seconds before sending, should still get the response as timeout is 5 seconds +Thread.sleep(2000); +producer.send(session.createTextMessage("test")); +LOG.info("message sent"); + +latch.await(); +assertEquals("test", buf.toString()); +} + +@Test(timeout = 60 * 1000) +public void testConsumeAsyncTimeout() throws Exception { +int port = getPort(); +HttpClient httpClient = new HttpClient(); +httpClient.start(); + +final StringBuffer buf = new StringBuffer(); +final CountDownLatch latch = +asyncRequest(httpClient, "http://localhost:; + port + "/message/test?readTimeout=1000=queue", buf); + +//Test timeout, no message was sent +latch.await(); +assertTrue(buf.toString().contains("AsyncContext timeout")); +} + @Test(timeout = 60 * 1000) public void testSubscribeFirst() throws Exception { int port = getPort(); diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml index 9ef75d62b..5f4f230bd 100644 --- a/activemq-web/pom.xml +++ b/activemq-web/pom.xml @@ -94,11 +94,6 @@ org.eclipse.jetty.websocket websocket-server - - org.eclipse.jetty - jetty-continuation - ${jetty-version} - diff --git a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java index 6d355c2d5..3a149f3bf 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java @@ -17,17 +17,15 @@ package org.apache.activemq.web; import java.util.LinkedList; - import javax.jms.Message; import javax.jms.MessageConsumer; - import org.apache.activemq.MessageAvailableListener; -import org.eclipse.jetty.continuation.Continuation; +import org.apache.activemq.web.async.AsyncServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* - * Listen for available messages and wakeup any continuations. + * Listen for available messages and wakeup any asyncRequests. */ public class AjaxLis
[activemq] branch main updated (3372e01f7 -> 905f00c84)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 3372e01f7 AMQ-9242 - remove compile time dependency log4j-slf4j2-impl dependency (#997) from activemq-partition add 905f00c84 AMQ-9243 - Remove deprecated jetty-continuation dependency (#998) No new revisions were added by this update. Summary of changes: activemq-web-console/pom.xml | 4 - .../java/org/apache/activemq/web/RestTest.java | 34 + activemq-web/pom.xml | 5 - .../java/org/apache/activemq/web/AjaxListener.java | 26 ++-- .../activemq/web/MessageListenerServlet.java | 43 ++ .../org/apache/activemq/web/MessageServlet.java| 71 - .../activemq/web/async/AsyncServletRequest.java| 168 + 7 files changed, 261 insertions(+), 90 deletions(-) create mode 100644 activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java
[activemq] branch activemq-5.18.x updated: AMQ-9242 - remove compile time dependency log4j-slf4j2-impl dependency (#997) from activemq-partition
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.18.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.18.x by this push: new f63328048 AMQ-9242 - remove compile time dependency log4j-slf4j2-impl dependency (#997) from activemq-partition f63328048 is described below commit f63328048f38d47c209e0191f8115436211d6488 Author: Christopher L. Shannon AuthorDate: Wed Apr 12 06:22:56 2023 -0400 AMQ-9242 - remove compile time dependency log4j-slf4j2-impl dependency (#997) from activemq-partition (cherry picked from commit 3372e01f7a9320cf2cba41ee117c0614c3b89c51) --- activemq-partition/pom.xml | 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml index 2a398be01..3fb2882db 100644 --- a/activemq-partition/pom.xml +++ b/activemq-partition/pom.xml @@ -32,7 +32,6 @@ Used to partition clients over a cluster of brokers - org.apache.activemq activemq-broker @@ -49,10 +48,6 @@ org.linkedin org.linkedin.zookeeper-impl - - org.apache.logging.log4j - log4j-slf4j2-impl - org.linkedin org.linkedin.util-core @@ -77,6 +72,17 @@ + + org.apache.logging.log4j + log4j-core + test + + + org.apache.logging.log4j + log4j-slf4j2-impl + test + + org.apache.activemq activemq-broker
[activemq] branch main updated (f8695b59f -> 3372e01f7)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from f8695b59f [AMQ-9237] Add META-INF/services to activemq-client-jakarta (#995) add 3372e01f7 AMQ-9242 - remove compile time dependency log4j-slf4j2-impl dependency (#997) from activemq-partition No new revisions were added by this update. Summary of changes: activemq-partition/pom.xml | 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-)
[activemq] branch main updated (5ca71dfee -> f8695b59f)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git from 5ca71dfee [AMQ-9234] Remove JournalJDBC store and activeio dependency (#993) add f8695b59f [AMQ-9237] Add META-INF/services to activemq-client-jakarta (#995) No new revisions were added by this update. Summary of changes: activemq-client-jakarta/pom.xml | 35 --- 1 file changed, 32 insertions(+), 3 deletions(-)
[activemq] branch main updated: AMQ-9161 - Fix javadoc comment so Xbean parsing works (#990)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 2e0eca718 AMQ-9161 - Fix javadoc comment so Xbean parsing works (#990) 2e0eca718 is described below commit 2e0eca71892541423097e43a8d7301db2439dd26 Author: Christopher L. Shannon AuthorDate: Wed Mar 15 07:35:03 2023 -0400 AMQ-9161 - Fix javadoc comment so Xbean parsing works (#990) The previous comment added in PR #989 added a second javadoc comment which prevent Xbean from generating the correct schema --- .../activemq/store/journal/JournalPersistenceAdapter.java| 8 +++- .../store/journal/JournalPersistenceAdapterFactory.java | 12 +--- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index c5d2cc298..0d541c31c 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -82,14 +82,12 @@ import org.slf4j.LoggerFactory; * {@link Journal} and then check pointing asynchronously on a timeout with some * other long term persistent storage. * + * @deprecated - Deprecated for removal as this PersistenceAdapter is no longer used and + * replaced by the JDBCPersistenceAdapter. + * * @org.apache.xbean.XBean * */ - -/** - * Deprecated for removal as this PersistenceAdapter is no longer used and - * replaced by the JDBCPersistenceAdapter. - */ @Deprecated(forRemoval = true) public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java index 6a2e51044..418fbc884 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java @@ -36,14 +36,12 @@ import org.slf4j.LoggerFactory; /** * Factory class that can create PersistenceAdapter objects. - * - * @org.apache.xbean.XBean - * - */ - -/** - * Deprecated for removal as this PersistenceAdapter is no longer used and + * + * @deprecated Deprecated for removal as this PersistenceAdapter is no longer used and * replaced by the JDBCPersistenceAdapter. + * + * @org.apache.xbean.XBean + * */ @Deprecated(forRemoval = true) public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
[activemq] branch main updated: [AMQ-8316] Flag deprecated methods in BrokerService (#988)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new c41be2a8d [AMQ-8316] Flag deprecated methods in BrokerService (#988) c41be2a8d is described below commit c41be2a8d765448b825c50388b1c5c137593194a Author: JB Onofré AuthorDate: Tue Mar 14 12:07:56 2023 +0100 [AMQ-8316] Flag deprecated methods in BrokerService (#988) These methods will be removed or renamed in a future release. Co-authored-by: Christopher L. Shannon --- .../org/apache/activemq/broker/BrokerService.java | 31 +++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 64858e9f8..8c969571c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -1672,6 +1672,7 @@ public class BrokerService implements Service { /** * @return Returns the shutdownOnMasterFailure. */ +@Deprecated(forRemoval = true) public boolean isShutdownOnMasterFailure() { return shutdownOnMasterFailure; } @@ -2901,49 +2902,73 @@ public class BrokerService implements Service { this.sslContext = sslContext; } +/** + * @deprecated this method will be renamed to not use slave wording + */ +@Deprecated(forRemoval = true) public boolean isShutdownOnSlaveFailure() { return shutdownOnSlaveFailure; } /** + * @deprecated this method will be renamed to not use slave wording + * * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" */ +@Deprecated(forRemoval = true) public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; } +/** + * @deprecated it will be removed as it should not be used directly. + */ +@Deprecated(forRemoval = true) public boolean isWaitForSlave() { return waitForSlave; } /** + * @deprecated this method will be renamed to not use slave wording + * * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" */ +@Deprecated(forRemoval = true) public void setWaitForSlave(boolean waitForSlave) { this.waitForSlave = waitForSlave; } +/** + * @deprecated this method will be renamed to not use slave wording + */ +@Deprecated(forRemoval = true) public long getWaitForSlaveTimeout() { return this.waitForSlaveTimeout; } +/** + * @deprecated this method will be renamed to not use slave wording + */ +@Deprecated(forRemoval = true) public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { this.waitForSlaveTimeout = waitForSlaveTimeout; } /** - * Get the passiveSlave - * @return the passiveSlave + * @deprecated this method will be renamed to not use slave wording */ +@Deprecated(forRemoval = true) public boolean isPassiveSlave() { return this.passiveSlave; } /** - * Set the passiveSlave + * @deprecated this method will be renamed to not use slave wording + * * @param passiveSlave the passiveSlave to set * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" */ +@Deprecated(forRemoval = true) public void setPassiveSlave(boolean passiveSlave) { this.passiveSlave = passiveSlave; }
[activemq] branch main updated: [AMQ-9161] Mark JDBC JournalPersistenceAdapter as deprecated for removal (#989)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 04f589e2b [AMQ-9161] Mark JDBC JournalPersistenceAdapter as deprecated for removal (#989) 04f589e2b is described below commit 04f589e2bd6032ca405e164e035168f9de919fb3 Author: Matt Pavlovich AuthorDate: Tue Mar 14 03:57:50 2023 -0600 [AMQ-9161] Mark JDBC JournalPersistenceAdapter as deprecated for removal (#989) Deprecated for removal as this PersistenceAdapter is no longer used and replaced by the JDBCPersistenceAdapter. --- .../apache/activemq/store/journal/JournalPersistenceAdapter.java| 6 ++ .../activemq/store/journal/JournalPersistenceAdapterFactory.java| 6 ++ 2 files changed, 12 insertions(+) diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 10b5c7a43..c5d2cc298 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -85,6 +85,12 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean * */ + +/** + * Deprecated for removal as this PersistenceAdapter is no longer used and + * replaced by the JDBCPersistenceAdapter. + */ +@Deprecated(forRemoval = true) public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { private BrokerService brokerService; diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java index 21f69d6d4..6a2e51044 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java @@ -40,6 +40,12 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean * */ + +/** + * Deprecated for removal as this PersistenceAdapter is no longer used and + * replaced by the JDBCPersistenceAdapter. + */ +@Deprecated(forRemoval = true) public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory { private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
[activemq] branch main updated: [AMQ-9218] Jakarta transition module for activemq-client (#968)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 2ee6bb0c7 [AMQ-9218] Jakarta transition module for activemq-client (#968) 2ee6bb0c7 is described below commit 2ee6bb0c794b84b9cd886e54be6ce6e833e5164c Author: Matt Pavlovich AuthorDate: Mon Mar 13 06:03:17 2023 -0500 [AMQ-9218] Jakarta transition module for activemq-client (#968) --- activemq-client-jakarta/pom.xml| 183 + .../org/apache/activemq/jakarta/JakartaTest.java | 86 ++ activemq-client/pom.xml| 1 + pom.xml| 46 -- 4 files changed, 302 insertions(+), 14 deletions(-) diff --git a/activemq-client-jakarta/pom.xml b/activemq-client-jakarta/pom.xml new file mode 100644 index 0..3ee72e39d --- /dev/null +++ b/activemq-client-jakarta/pom.xml @@ -0,0 +1,183 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> +4.0.0 + +org.apache.activemq +activemq-parent +5.18.0-SNAPSHOT + +activemq-client-jakarta +bundle +ActiveMQ :: Client Jakarta +Jakarta transition module for ActiveMQ Client implementation + + +org.apache.activemq +activemq-client +provided + + +jakarta.jms +jakarta.jms-api + + + + +jakarta.jms +jakarta.jms-api +${jakarta-jms-api-v3-version} + + +org.fusesource.hawtbuf +hawtbuf +${hawtbuf-version} + + +org.slf4j +slf4j-api + + +javax.jmdns +jmdns +true +provided + + +com.thoughtworks.xstream +xstream +provided + + +junit +junit +test + + + + + +maven-dependency-plugin + + +unpack-source +initialize + +unpack + + + + +org.apache.activemq +activemq-client +sources +jar + ${project.build.directory}/copied-sources/activemq-client + **/META-INF/**,**/zeroconf/** +**/** + + + + + + + +com.google.code.maven-replacer-plugin +replacer + + +initialize + +replace + + + + + + ${project.build.directory}/copied-sources/activemq-client/**/*.java + +javax.jms +jakarta.jms + +MULTILINE + + + + +maven-resources-plugin + + +copy-resources +generate-sources + +copy-resources + + + ${project.build.directory}/generated-sources + + + ${project.build.directory}/copied-sources/activemq-client + + + + + + + +org.codehaus.mojo +build-helper-maven-plugin + + +add-source +generate-sources + +add-source + + + + ${project.build.directory}/gene
[activemq] branch main updated: AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods (#979)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 9fe24cd8e AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods (#979) 9fe24cd8e is described below commit 9fe24cd8e3f67a7249afa32b01fc46807c5d8997 Author: Christopher L. Shannon AuthorDate: Thu Feb 16 18:03:14 2023 -0500 AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods (#979) AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods --- .../java/org/apache/activemq/ActiveMQProducer.java | 28 +-- .../activemq/command/ActiveMQBytesMessage.java | 17 ++ .../activemq/command/ActiveMQMapMessage.java | 42 - .../apache/activemq/command/ActiveMQMessage.java | 24 ++- .../activemq/command/ActiveMQObjectMessage.java| 31 +++- .../activemq/command/ActiveMQStreamMessage.java| 5 + .../activemq/command/ActiveMQTextMessage.java | 17 ++ .../org/apache/activemq/util/ByteSequence.java | 12 ++ .../activemq/jms2/ActiveMQJMS2MessageTest.java | 196 + 9 files changed, 332 insertions(+), 40 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java index 7a6c621a9..2596bf2aa 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java @@ -108,33 +108,7 @@ public class ActiveMQProducer implements JMSProducer { if (body != null) { try { for (Map.Entry mapEntry : body.entrySet()) { - final String key = mapEntry.getKey(); - final Object value = mapEntry.getValue(); - final Class valueObject = value.getClass(); - if (String.class.isAssignableFrom(valueObject)) { - mapMessage.setString(key, String.class.cast(value)); - } else if (Integer.class.isAssignableFrom(valueObject)) { - mapMessage.setInt(key, Integer.class.cast(value)); - } else if (Long.class.isAssignableFrom(valueObject)) { - mapMessage.setLong(key, Long.class.cast(value)); - } else if (Double.class.isAssignableFrom(valueObject)) { - mapMessage.setDouble(key, Double.class.cast(value)); - } else if (Boolean.class.isAssignableFrom(valueObject)) { - mapMessage.setBoolean(key, Boolean.class.cast(value)); - } else if (Character.class.isAssignableFrom(valueObject)) { - mapMessage.setChar(key, Character.class.cast(value)); - } else if (Short.class.isAssignableFrom(valueObject)) { - mapMessage.setShort(key, Short.class.cast(value)); - } else if (Float.class.isAssignableFrom(valueObject)) { - mapMessage.setFloat(key, Float.class.cast(value)); - } else if (Byte.class.isAssignableFrom(valueObject)) { - mapMessage.setByte(key, Byte.class.cast(value)); - } else if (byte[].class.isAssignableFrom(valueObject)) { - byte[] array = byte[].class.cast(value); - mapMessage.setBytes(key, array, 0, array.length); - } else { - mapMessage.setObject(key, value); - } + mapMessage.setObject(mapEntry.getKey(), mapEntry.getValue()); } } catch (JMSException e) { throw new MessageFormatRuntimeException(e.getMessage()); diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index 14028d732..2800050b6 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -956,4 +956,21 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag } } } + +@Override +@SuppressWarnings("unchecked") +public boolean isBodyAssignableTo(Class c) { +return getContent() == null || c.isAssignableFrom(byte[].class); +} + +@SuppressWarnings("unchecked") +protected T doGetBody(Class asType) { +//Make sure the bytes are stored before trying to copy and return +if (dataOut != null && getContent() == null) { +storeContent(); +} + +final ByteSequence content = getContent(); +return content != null ?
[activemq] 01/02: [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 47962eea259f4779e7b5f019d24bfdd1e5b552e8 Author: Matt Pavlovich AuthorDate: Thu Feb 2 16:30:28 2023 -0600 [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy (cherry picked from commit 28f7eb7ee87c47e43cc3db11fcd550ef872327b3) --- .../region/policy/AbstractDeadLetterStrategy.java | 22 ++-- .../policy/IndividualDeadLetterStrategy.java | 48 .../region/policy/SharedDeadLetterStrategy.java| 27 + .../broker/policy/IndividualDeadLetterTest.java| 127 + 4 files changed, 209 insertions(+), 15 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index fa6532b33..82e07560e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -31,13 +31,12 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { private boolean processNonPersistent = false; private boolean processExpired = true; private boolean enableAudit = true; -private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); private long expiration; @Override public void rollback(Message message) { if (message != null && this.enableAudit) { -messageAudit.rollback(message); +lookupActiveMQMessageAudit(message).rollback(message); } } @@ -46,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { boolean result = false; if (message != null) { result = true; -if (enableAudit && messageAudit.isDuplicate(message)) { +if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { result = false; LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); } @@ -108,20 +107,13 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { this.expiration = expiration; } -public int getMaxProducersToAudit() { -return messageAudit.getMaximumNumberOfProducersToTrack(); -} +public abstract int getMaxProducersToAudit(); -public void setMaxProducersToAudit(int maxProducersToAudit) { -messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); -} +public abstract void setMaxProducersToAudit(int maxProducersToAudit); -public void setMaxAuditDepth(int maxAuditDepth) { -messageAudit.setAuditDepth(maxAuditDepth); -} +public abstract void setMaxAuditDepth(int maxAuditDepth); -public int getMaxAuditDepth() { -return messageAudit.getAuditDepth(); -} +public abstract int getMaxAuditDepth(); +protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 1dfaa1566..3dd41ae0b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; @@ -23,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; +import org.apache.activemq.util.LRUCache; /** * A {@link DeadLetterStrategy} where each destination has its own individual @@ -40,6 +42,10 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { private boolean useQueueForQueueMessages = true; private boolean useQueueForTopicMessages = true; private boolean destinationPerDurableSubscriber; +private int maxAuditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; +private int maxProducersToAudit = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + +private final LRUCache dedicatedMess
[activemq] 02/02: AMQ-9217 - Fix IndividualDeadLetter strategy rollback
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git commit 50943caa6e30c7f5bf77db91340d7e1d73c21faa Author: Christopher L. Shannon (cshannon) AuthorDate: Thu Feb 16 07:21:20 2023 -0500 AMQ-9217 - Fix IndividualDeadLetter strategy rollback This fixes the rollback after the latest changes by using the originalDestination property to look up the correct message audit on rollback (cherry picked from commit 459388185a3acd3a175d304b08af3c638e870292) --- .../broker/region/policy/AbstractDeadLetterStrategy.java | 6 +++--- .../broker/region/policy/IndividualDeadLetterStrategy.java | 14 +++--- .../broker/region/policy/SharedDeadLetterStrategy.java | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 82e07560e..242120681 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { @Override public void rollback(Message message) { if (message != null && this.enableAudit) { -lookupActiveMQMessageAudit(message).rollback(message); +lookupActiveMQMessageAudit(message, true).rollback(message); } } @@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { boolean result = false; if (message != null) { result = true; -if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { +if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) { result = false; LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); } @@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { public abstract int getMaxAuditDepth(); -protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); +protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 3dd41ae0b..f626365fa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { } @Override -protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { +protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) { ActiveMQMessageAudit messageAudit; synchronized(dedicatedMessageAudits) { -messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName()); +// Normally we want to just use the destination property on the message as the key for the map for +// caching the messageAudit object for each destination. However, when rolling back, the message +// provided here has had its destination changed to the individual DLQ destination and is no longer +// the original destination. So to find the correct messageAudit to rollback we need to use +// the originalDestination property on the message to get the correct destination that was +// used to first cache the messageAudit. +final String destinationName = rollback && message.getOriginalDestination() != null ? +message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName(); +messageAudit = dedicatedMessageAudits.get(destinationName); if(messageAudit == null) { messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit()); - dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit); +dedicatedMessageAudits.put(destinationName, messageAudit); } return messageAudit; diff --git a/activemq-broker/src/m
[activemq] branch activemq-5.17.x updated (b7925d843 -> 50943caa6)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a change to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git from b7925d843 Update copyright to include 2023 new 47962eea2 [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy new 50943caa6 AMQ-9217 - Fix IndividualDeadLetter strategy rollback The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../region/policy/AbstractDeadLetterStrategy.java | 22 ++-- .../policy/IndividualDeadLetterStrategy.java | 56 + .../region/policy/SharedDeadLetterStrategy.java| 27 + .../broker/policy/IndividualDeadLetterTest.java| 127 + 4 files changed, 217 insertions(+), 15 deletions(-)
[activemq] branch main updated: AMQ-9217 - Fix IndividualDeadLetter strategy rollback
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 459388185 AMQ-9217 - Fix IndividualDeadLetter strategy rollback new ae4e305f8 Merge pull request #980 from cshannon/amq-9217-rollback 459388185 is described below commit 459388185a3acd3a175d304b08af3c638e870292 Author: Christopher L. Shannon (cshannon) AuthorDate: Thu Feb 16 07:21:20 2023 -0500 AMQ-9217 - Fix IndividualDeadLetter strategy rollback This fixes the rollback after the latest changes by using the originalDestination property to look up the correct message audit on rollback --- .../broker/region/policy/AbstractDeadLetterStrategy.java | 6 +++--- .../broker/region/policy/IndividualDeadLetterStrategy.java | 14 +++--- .../broker/region/policy/SharedDeadLetterStrategy.java | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 82e07560e..242120681 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { @Override public void rollback(Message message) { if (message != null && this.enableAudit) { -lookupActiveMQMessageAudit(message).rollback(message); +lookupActiveMQMessageAudit(message, true).rollback(message); } } @@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { boolean result = false; if (message != null) { result = true; -if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { +if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) { result = false; LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); } @@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { public abstract int getMaxAuditDepth(); -protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); +protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 3dd41ae0b..f626365fa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { } @Override -protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { +protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) { ActiveMQMessageAudit messageAudit; synchronized(dedicatedMessageAudits) { -messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName()); +// Normally we want to just use the destination property on the message as the key for the map for +// caching the messageAudit object for each destination. However, when rolling back, the message +// provided here has had its destination changed to the individual DLQ destination and is no longer +// the original destination. So to find the correct messageAudit to rollback we need to use +// the originalDestination property on the message to get the correct destination that was +// used to first cache the messageAudit. +final String destinationName = rollback && message.getOriginalDestination() != null ? +message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName(); +messageAudit = dedicatedMessageAudits.get(destinationName); if(messageAudit == null) { messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit()); - dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit); +
[activemq] branch main updated: AMQ-7309 - remove test after adding support for jms 2 create session
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new ee1477830 AMQ-7309 - remove test after adding support for jms 2 create session ee1477830 is described below commit ee1477830511a71d89a1e987191c271cb8c0b5a7 Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Feb 15 18:05:17 2023 -0500 AMQ-7309 - remove test after adding support for jms 2 create session --- .../test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java | 5 - 1 file changed, 5 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java index 4891603ab..a6ecaf070 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java @@ -219,11 +219,6 @@ public class ActiveMQJMS2ContextTest extends ActiveMQJMS2TestBase { connection.createSharedDurableConnectionConsumer(null, null, null, null, 10); } -@Test(expected = UnsupportedOperationException.class) -public void testSessionAckMode() throws JMSException { -connection.createSession(Session.AUTO_ACKNOWLEDGE); -} - @Test public void testSessionDurableConsumer() throws JMSException { try(JMSContext jmsContext = activemqConnectionFactory.createContext()) {
[activemq] branch main updated: AMQ-7309 - Implement JMS 2.0 createSession methods
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 2c69e3074 AMQ-7309 - Implement JMS 2.0 createSession methods new 160840d3b Merge pull request #978 from cshannon/AMQ-7309-session 2c69e3074 is described below commit 2c69e30748c361ac3d62b6b3b37e703ac64a220b Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Feb 15 08:30:21 2023 -0500 AMQ-7309 - Implement JMS 2.0 createSession methods Adds a real implemention for createSession() and createSession(ackMode) methods on ActiveMQConnection --- .../org/apache/activemq/ActiveMQConnection.java| 6 +- .../activemq/jms2/ActiveMQJMS2ConnectionTest.java | 65 ++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 58d419725..2ccac1d99 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -317,7 +317,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon */ @Override public Session createSession() throws JMSException { - throw new UnsupportedOperationException("createSession() is unsupported"); +return createSession(false, Session.AUTO_ACKNOWLEDGE); } /** @@ -340,8 +340,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @since 2.0 */ @Override -public Session createSession(int sessionMode) throws JMSException { -throw new UnsupportedOperationException("createSession(int sessionMode) is unsupported"); +public Session createSession(int acknowledgeMode) throws JMSException { +return createSession(acknowledgeMode == Session.SESSION_TRANSACTED, acknowledgeMode); } /** diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java new file mode 100644 index 0..d6765ad38 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms2; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.JMSException; +import javax.jms.Session; +import org.junit.Test; + +public class ActiveMQJMS2ConnectionTest extends ActiveMQJMS2TestBase { + +@Test +public void testCreateSession() throws Exception { +verifySession(connection.createSession(), Session.AUTO_ACKNOWLEDGE); +} + +@Test +public void testCreateSessionAckModeAuto() throws Exception { +verifySession(connection.createSession(Session.AUTO_ACKNOWLEDGE), Session.AUTO_ACKNOWLEDGE); +} + +@Test +public void testCreateSessionAckModeClient() throws Exception { +verifySession(connection.createSession(Session.CLIENT_ACKNOWLEDGE), Session.CLIENT_ACKNOWLEDGE); +} + +@Test +public void testCreateSessionAckModeDups() throws Exception { +verifySession(connection.createSession(Session.DUPS_OK_ACKNOWLEDGE), Session.DUPS_OK_ACKNOWLEDGE); +} + +@Test +public void testCreateSessionAckModeTrans() throws Exception { +verifySession(connection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED); +} + +private void verifySession(Session session, int acknowledgeMode) throws JMSException { +try { +assertNotNull(session); +assertEquals(acknowledgeMode, session.getAcknowledgeMode()); +assertEquals(acknowledgeMode == Session.SESSION_TRANSACTED, session.getTransacted()); +} finally { +if (session != null) { +session.close(); +} +} +} + +}
[activemq] branch main updated: [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new 28f7eb7ee [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy new dd6118e68 Merge pull request #965 from mattrpav/AMQ-9217 28f7eb7ee is described below commit 28f7eb7ee87c47e43cc3db11fcd550ef872327b3 Author: Matt Pavlovich AuthorDate: Thu Feb 2 16:30:28 2023 -0600 [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy --- .../region/policy/AbstractDeadLetterStrategy.java | 22 ++-- .../policy/IndividualDeadLetterStrategy.java | 48 .../region/policy/SharedDeadLetterStrategy.java| 27 + .../broker/policy/IndividualDeadLetterTest.java| 127 + 4 files changed, 209 insertions(+), 15 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index fa6532b33..82e07560e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -31,13 +31,12 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { private boolean processNonPersistent = false; private boolean processExpired = true; private boolean enableAudit = true; -private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); private long expiration; @Override public void rollback(Message message) { if (message != null && this.enableAudit) { -messageAudit.rollback(message); +lookupActiveMQMessageAudit(message).rollback(message); } } @@ -46,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { boolean result = false; if (message != null) { result = true; -if (enableAudit && messageAudit.isDuplicate(message)) { +if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { result = false; LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); } @@ -108,20 +107,13 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { this.expiration = expiration; } -public int getMaxProducersToAudit() { -return messageAudit.getMaximumNumberOfProducersToTrack(); -} +public abstract int getMaxProducersToAudit(); -public void setMaxProducersToAudit(int maxProducersToAudit) { -messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); -} +public abstract void setMaxProducersToAudit(int maxProducersToAudit); -public void setMaxAuditDepth(int maxAuditDepth) { -messageAudit.setAuditDepth(maxAuditDepth); -} +public abstract void setMaxAuditDepth(int maxAuditDepth); -public int getMaxAuditDepth() { -return messageAudit.getAuditDepth(); -} +public abstract int getMaxAuditDepth(); +protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 1dfaa1566..3dd41ae0b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; @@ -23,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; +import org.apache.activemq.util.LRUCache; /** * A {@link DeadLetterStrategy} where each destination has its own individual @@ -40,6 +42,10 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { private boolean useQueueForQueueMessages = true; private boolean useQueueForTopicMessages = true; private boolean destinationPerDurableSubscriber; +private int maxAuditDepth = Active
[activemq] branch activemq-5.17.x updated: AMQ-9202 - Make sure Reentrant locks are acquired outside a try block
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new 6d91d71c1 AMQ-9202 - Make sure Reentrant locks are acquired outside a try block 6d91d71c1 is described below commit 6d91d71c1acab166a607425e88a228ba28d06bab Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Feb 1 11:17:31 2023 -0500 AMQ-9202 - Make sure Reentrant locks are acquired outside a try block This is best practice and will prevent unlock from being attempted inside of a finally block when the thread doesn't actually own the lock which can happen when the lock attempt throws an exception such as calling lockInterruptibly() (cherry picked from commit ed924cddac90b96bdc47b215852a68155d818bcd) --- .../org/apache/activemq/broker/region/Queue.java | 2 +- .../activemq/store/kahadb/MessageDatabase.java | 74 +++--- .../plugin/AbstractRuntimeConfigurationBroker.java | 10 +-- 3 files changed, 42 insertions(+), 44 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 4bc669046..79f897ec1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; +sendLock.lock(); try { -sendLock.lock(); long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 55137db18..6687c56b4 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2070,28 +2070,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe //flag to know whether the ack forwarding completed without an exception boolean forwarded = false; +//acquire the checkpoint lock to prevent other threads from +//running a checkpoint while this is running +// +//Normally this task runs on the same executor as the checkpoint task +//so this ack compaction runner wouldn't run at the same time as the checkpoint task. +// +//However, there are two cases where this isn't always true. +//First, the checkpoint() method is public and can be called through the +//PersistenceAdapter interface by someone at the same time this is running. +//Second, a checkpoint is called during shutdown without using the executor. +// +//In the future it might be better to just remove the checkpointLock entirely +//and only use the executor but this would need to be examined for any unintended +//consequences +checkpointLock.readLock().lock(); try { -//acquire the checkpoint lock to prevent other threads from -//running a checkpoint while this is running -// -//Normally this task runs on the same executor as the checkpoint task -//so this ack compaction runner wouldn't run at the same time as the checkpoint task. -// -//However, there are two cases where this isn't always true. -//First, the checkpoint() method is public and can be called through the -//PersistenceAdapter interface by someone at the same time this is running. -//Second, a checkpoint is called during shutdown without using the executor. -// -//In the future it might be better to just remove the checkpointLock entirely -//and only use the executor but this would need to be examined for any unintended -//consequences -checkpointLock.readLock().lock(); - +// Lock index to capture the ackMessageFileMap data +indexLock.writeLock().lock(); try { - -// Lock index to capture the ackMessageFileMap data -indexLock.writeLock
[activemq] branch main updated: AMQ-9202 - Make sure Reentrant locks are acquired outside a try block
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git The following commit(s) were added to refs/heads/main by this push: new ed924cdda AMQ-9202 - Make sure Reentrant locks are acquired outside a try block new 3f68f4993 Merge pull request #959 from cshannon/AMQ-9202 ed924cdda is described below commit ed924cddac90b96bdc47b215852a68155d818bcd Author: Christopher L. Shannon (cshannon) AuthorDate: Wed Feb 1 11:17:31 2023 -0500 AMQ-9202 - Make sure Reentrant locks are acquired outside a try block This is best practice and will prevent unlock from being attempted inside of a finally block when the thread doesn't actually own the lock which can happen when the lock attempt throws an exception such as calling lockInterruptibly() --- .../org/apache/activemq/broker/region/Queue.java | 2 +- .../activemq/store/kahadb/MessageDatabase.java | 74 +++--- .../plugin/AbstractRuntimeConfigurationBroker.java | 10 +-- 3 files changed, 42 insertions(+), 44 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 4bc669046..79f897ec1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; +sendLock.lock(); try { -sendLock.lock(); long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 55137db18..6687c56b4 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2070,28 +2070,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe //flag to know whether the ack forwarding completed without an exception boolean forwarded = false; +//acquire the checkpoint lock to prevent other threads from +//running a checkpoint while this is running +// +//Normally this task runs on the same executor as the checkpoint task +//so this ack compaction runner wouldn't run at the same time as the checkpoint task. +// +//However, there are two cases where this isn't always true. +//First, the checkpoint() method is public and can be called through the +//PersistenceAdapter interface by someone at the same time this is running. +//Second, a checkpoint is called during shutdown without using the executor. +// +//In the future it might be better to just remove the checkpointLock entirely +//and only use the executor but this would need to be examined for any unintended +//consequences +checkpointLock.readLock().lock(); try { -//acquire the checkpoint lock to prevent other threads from -//running a checkpoint while this is running -// -//Normally this task runs on the same executor as the checkpoint task -//so this ack compaction runner wouldn't run at the same time as the checkpoint task. -// -//However, there are two cases where this isn't always true. -//First, the checkpoint() method is public and can be called through the -//PersistenceAdapter interface by someone at the same time this is running. -//Second, a checkpoint is called during shutdown without using the executor. -// -//In the future it might be better to just remove the checkpointLock entirely -//and only use the executor but this would need to be examined for any unintended -//consequences -checkpointLock.readLock().lock(); - +// Lock index to capture the ackMessageFileMap data +indexLock.writeLock().lock(); try { - -// Lock index to capture the ackMessageFileMap data -indexLock.writeLock().lock