This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 4f9898d69e149373a5078d5bb54adde8936b1a52 Author: gchuf <[email protected]> AuthorDate: Fri Nov 7 09:32:29 2025 +0100 ARTEMIS-5752 Fix CRLF line endings for java files Fixes line endings for these files from CRLF to LF by running git renormalize --- .../impl/SlowConsumerThresholdMeasurementUnit.java | 106 +-- .../replication/RepeatStartBackupTest.java | 716 ++++++++++----------- .../retention/ReplayWithReplicationTest.java | 306 ++++----- .../RedeployTempTest-reload-temp-updated.xml | 242 +++---- .../resources/RedeployTempTest-reload-temp.xml | 242 +++---- 5 files changed, 806 insertions(+), 806 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java index 0b092f66f1..c3db07f09b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerThresholdMeasurementUnit.java @@ -1,53 +1,53 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.settings.impl; - -import java.util.concurrent.TimeUnit; - -public enum SlowConsumerThresholdMeasurementUnit { - MESSAGES_PER_SECOND(1), MESSAGES_PER_MINUTE(60), MESSAGES_PER_HOUR(3600), MESSAGES_PER_DAY(3600 * 24); - - private final int measurementUnitInSeconds; - - SlowConsumerThresholdMeasurementUnit(int measurementUnitInSeconds) { - this.measurementUnitInSeconds = measurementUnitInSeconds; - } - - public static SlowConsumerThresholdMeasurementUnit valueOf(int measurementUnitInSeconds) { - return switch (measurementUnitInSeconds) { - case 1 -> MESSAGES_PER_SECOND; - case 60 -> MESSAGES_PER_MINUTE; - case 3600 -> MESSAGES_PER_HOUR; - case 3600 * 24 -> MESSAGES_PER_DAY; - default -> null; - }; - } - - public static TimeUnit unitOf(int measurementUnitInSeconds) { - return switch (measurementUnitInSeconds) { - case 1 -> TimeUnit.SECONDS; - case 60 -> TimeUnit.MINUTES; - case 3600 -> TimeUnit.HOURS; - case 3600 * 24 -> TimeUnit.DAYS; - default -> null; - }; - } - - public int getValue() { - return measurementUnitInSeconds; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.settings.impl; + +import java.util.concurrent.TimeUnit; + +public enum SlowConsumerThresholdMeasurementUnit { + MESSAGES_PER_SECOND(1), MESSAGES_PER_MINUTE(60), MESSAGES_PER_HOUR(3600), MESSAGES_PER_DAY(3600 * 24); + + private final int measurementUnitInSeconds; + + SlowConsumerThresholdMeasurementUnit(int measurementUnitInSeconds) { + this.measurementUnitInSeconds = measurementUnitInSeconds; + } + + public static SlowConsumerThresholdMeasurementUnit valueOf(int measurementUnitInSeconds) { + return switch (measurementUnitInSeconds) { + case 1 -> MESSAGES_PER_SECOND; + case 60 -> MESSAGES_PER_MINUTE; + case 3600 -> MESSAGES_PER_HOUR; + case 3600 * 24 -> MESSAGES_PER_DAY; + default -> null; + }; + } + + public static TimeUnit unitOf(int measurementUnitInSeconds) { + return switch (measurementUnitInSeconds) { + case 1 -> TimeUnit.SECONDS; + case 60 -> TimeUnit.MINUTES; + case 3600 -> TimeUnit.HOURS; + case 3600 * 24 -> TimeUnit.DAYS; + default -> null; + }; + } + + public int getValue() { + return measurementUnitInSeconds; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/RepeatStartBackupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/RepeatStartBackupTest.java index 25ad31a88a..bd4d0d0ed1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/RepeatStartBackupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/RepeatStartBackupTest.java @@ -1,358 +1,358 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.replication; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import java.io.File; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.util.collection.LongObjectHashMap; -import org.apache.activemq.artemis.api.core.QueueConfiguration; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; -import org.apache.activemq.artemis.core.config.ha.DistributedLockManagerConfiguration; -import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; -import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; -import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.journal.collections.JournalHashMap; -import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServers; -import org.apache.activemq.artemis.core.server.JournalType; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.impl.AckReason; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; -import org.apache.activemq.artemis.logs.AssertionLoggerHandler; -import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager; -import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.tests.util.CFUtil; -import org.apache.activemq.artemis.utils.RandomUtil; -import org.apache.activemq.artemis.tests.util.Wait; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class RepeatStartBackupTest extends ActiveMQTestBase { - - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private DistributedLockManagerConfiguration managerConfiguration; - ActiveMQServer backupServer; - ActiveMQServer server; - - File newTemporaryFolder(String name) { - File newFolder = new File(temporaryFolder, name); - newFolder.mkdirs(); - return newFolder; - } - - @BeforeEach - @Override - public void setUp() throws Exception { - managerConfiguration = new DistributedLockManagerConfiguration(FileBasedLockManager.class.getName(), Collections.singletonMap("locks-folder", newTemporaryFolder("manager").toString())); - - // start live - Configuration liveConfiguration = createLiveConfiguration(); - - server = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration)); - server.setIdentity("PRIMARY"); - server.getConfiguration().setJournalFileSize(100 * 1024); - - server.start(); - - server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST)); - server.createQueue(QueueConfiguration.of("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST)); - - // start backup - Configuration backupConfiguration = createBackupConfiguration(); - ((ReplicationBackupPolicyConfiguration) backupConfiguration.getHAPolicyConfiguration()).setAllowFailBack(true); - backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); - backupServer.setIdentity("BACKUP"); - } - - private void startBackup(int timeout) throws Exception { - backupServer.start(); - - Wait.waitFor(backupServer::isStarted); - - Wait.assertTrue(() -> backupServer.isReplicaSync(), timeout); - } - - @Test - public void testLoopStart() throws Exception { - startBackup(30_000); - - try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { - - ExecutorService executorService = Executors.newFixedThreadPool(1); - runAfter(executorService::shutdownNow); - - AtomicInteger errors = new AtomicInteger(0); - AtomicBoolean running = new AtomicBoolean(true); - - runAfter(() -> running.set(false)); - CountDownLatch latch = new CountDownLatch(1); - - executorService.execute(() -> { - try { - ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); - try (Connection connection = factory.createConnection()) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue("t1")); - MessageProducer producer = session.createProducer(session.createQueue("t1")); - connection.start(); - while (running.get()) { - producer.send(session.createTextMessage("hello")); - assertNotNull(consumer.receive(1000)); - } - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - errors.incrementAndGet(); - } finally { - latch.countDown(); - } - }); - - for (int i = 0; i < 5; i++) { - logger.info("\n*******************************************************************************************************************************\ntest {}\n*******************************************************************************************************************************", i); - backupServer.stop(); - Wait.assertFalse(backupServer::isStarted); - backupServer.start(); - Wait.assertTrue(backupServer::isStarted); - if (i % 2 == 1) { - Wait.assertTrue(backupServer::isReplicaSync); - } - - assertFalse(loggerHandler.findText("AMQ229254")); - assertFalse(loggerHandler.findText("AMQ229006")); - loggerHandler.clear(); - } - - running.set(false); - - assertTrue(latch.await(10, TimeUnit.SECONDS)); - - assertEquals(0, errors.get()); - } - } - - @Test - public void testAckManagerRepetition() throws Exception { - - String queueName = "queue_" + RandomUtil.randomUUIDString(); - - // some extremely large retry settings - // just to make sure these records will never be removed - server.getConfiguration().setMirrorAckManagerQueueAttempts(300000); - server.getConfiguration().setMirrorAckManagerPageAttempts(300000); - server.getConfiguration().setMirrorAckManagerRetryDelay(60_000); - backupServer.getConfiguration().setMirrorAckManagerQueueAttempts(300000); - backupServer.getConfiguration().setMirrorAckManagerPageAttempts(300000); - backupServer.getConfiguration().setMirrorAckManagerRetryDelay(60_000); - - ExecutorService executorService = Executors.newFixedThreadPool(2); - runAfter(executorService::shutdownNow); - - AtomicInteger errors = new AtomicInteger(0); - AtomicBoolean running = new AtomicBoolean(true); - - runAfter(() -> running.set(false)); - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch backupStarted = new CountDownLatch(1); - - AtomicInteger recordsSent = new AtomicInteger(0); - - int starBackupAt = 100; - assertFalse(server.isReplicaSync()); - assertFalse(backupServer.isStarted()); - - AckManager liveAckManager = AckManagerProvider.getManager(server); - server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST)); - Queue queueOnServerLive = server.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(true)); - long queueIdOnServerLive = queueOnServerLive.getID(); - - OperationContextImpl context = new OperationContextImpl(server.getExecutorFactory().getExecutor()); - - executorService.execute(() -> { - try { - OperationContextImpl.setContext(context); - while (running.get()) { - int id = recordsSent.getAndIncrement(); - if (id == starBackupAt) { - executorService.execute(() -> { - try { - backupServer.start(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } finally { - backupStarted.countDown(); - } - }); - } - CountDownLatch latchAcked = new CountDownLatch(1); - liveAckManager.ack(server.getNodeID().toString(), queueOnServerLive, id, AckReason.NORMAL, true); - OperationContextImpl.getContext().executeOnCompletion(new IOCallback() { - @Override - public void done() { - latchAcked.countDown(); - } - - @Override - public void onError(int errorCode, String errorMessage) { - } - }); - if (!latchAcked.await(10, TimeUnit.SECONDS)) { - logger.warn("Could not wait ack to finish"); - } - Thread.yield(); - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - errors.incrementAndGet(); - } finally { - latch.countDown(); - } - }); - - assertTrue(backupStarted.await(10, TimeUnit.SECONDS)); - - Wait.assertTrue(server::isReplicaSync); - Wait.assertTrue(() -> recordsSent.get() > 200); - running.set(false); - assertTrue(latch.await(10, TimeUnit.SECONDS)); - - assertEquals(0, errors.get()); - - validateAckManager(server, queueName, queueIdOnServerLive, recordsSent.get()); - - server.stop(); - Wait.assertTrue(backupServer::isActive); - - validateAckManager(backupServer, queueName, queueIdOnServerLive, recordsSent.get()); - } - - private void validateAckManager(ActiveMQServer server, - String queueName, - long queueIdOnServerLive, - int messagesSent) { - AckManager liveManager = AckManagerProvider.getManager(server); - Map<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = liveManager.sortRetries(); - assertEquals(1, sortedRetries.size()); - - LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> retryAddress = sortedRetries.get(SimpleString.of(queueName)); - JournalHashMap<AckRetry, AckRetry, Queue> journalHashMapBackup = retryAddress.get(queueIdOnServerLive); - assertEquals(messagesSent, journalHashMapBackup.size()); - } - - protected HAPolicyConfiguration createReplicationLiveConfiguration() { - ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault(); - haPolicy.setDistributedManagerConfiguration(managerConfiguration); - return haPolicy; - } - - protected HAPolicyConfiguration createReplicationBackupConfiguration() { - ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault(); - haPolicy.setDistributedManagerConfiguration(managerConfiguration); - haPolicy.setClusterName("cluster"); - return haPolicy; - } - - protected Configuration createLiveConfiguration() throws Exception { - Configuration conf = new ConfigurationImpl(); - conf.setJournalType(JournalType.NIO); - conf.setName("localhost::live"); - - File liveDir = newTemporaryFolder("live"); - conf.setBrokerInstance(liveDir); - - conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); - conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); - conf.addConnectorConfiguration("live", "tcp://localhost:61616"); - - conf.setClusterUser("mycluster"); - conf.setClusterPassword("mypassword"); - - conf.setHAPolicyConfiguration(createReplicationLiveConfiguration()); - - ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); - ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); - ccconf.setName("cluster"); - ccconf.setConnectorName("live"); - conf.addClusterConfiguration(ccconf); - - conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.NIO).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); - - return conf; - } - - protected Configuration createBackupConfiguration() throws Exception { - Configuration conf = new ConfigurationImpl(); - conf.setName("localhost::backup"); - - File backupDir = newTemporaryFolder("backup"); - conf.setBrokerInstance(backupDir); - - conf.setHAPolicyConfiguration(createReplicationBackupConfiguration()); - - conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); - conf.addConnectorConfiguration("live", "tcp://localhost:61616"); - conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); - - conf.setClusterUser("mycluster"); - conf.setClusterPassword("mypassword"); - - ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); - ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); - ccconf.setName("cluster"); - ccconf.setConnectorName("backup"); - conf.addClusterConfiguration(ccconf); - - conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.NIO).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); - - return conf; - } - - - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.replication; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.DistributedLockManagerConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.journal.collections.JournalHashMap; +import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RepeatStartBackupTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private DistributedLockManagerConfiguration managerConfiguration; + ActiveMQServer backupServer; + ActiveMQServer server; + + File newTemporaryFolder(String name) { + File newFolder = new File(temporaryFolder, name); + newFolder.mkdirs(); + return newFolder; + } + + @BeforeEach + @Override + public void setUp() throws Exception { + managerConfiguration = new DistributedLockManagerConfiguration(FileBasedLockManager.class.getName(), Collections.singletonMap("locks-folder", newTemporaryFolder("manager").toString())); + + // start live + Configuration liveConfiguration = createLiveConfiguration(); + + server = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration)); + server.setIdentity("PRIMARY"); + server.getConfiguration().setJournalFileSize(100 * 1024); + + server.start(); + + server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST)); + + // start backup + Configuration backupConfiguration = createBackupConfiguration(); + ((ReplicationBackupPolicyConfiguration) backupConfiguration.getHAPolicyConfiguration()).setAllowFailBack(true); + backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); + backupServer.setIdentity("BACKUP"); + } + + private void startBackup(int timeout) throws Exception { + backupServer.start(); + + Wait.waitFor(backupServer::isStarted); + + Wait.assertTrue(() -> backupServer.isReplicaSync(), timeout); + } + + @Test + public void testLoopStart() throws Exception { + startBackup(30_000); + + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + + ExecutorService executorService = Executors.newFixedThreadPool(1); + runAfter(executorService::shutdownNow); + + AtomicInteger errors = new AtomicInteger(0); + AtomicBoolean running = new AtomicBoolean(true); + + runAfter(() -> running.set(false)); + CountDownLatch latch = new CountDownLatch(1); + + executorService.execute(() -> { + try { + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("t1")); + MessageProducer producer = session.createProducer(session.createQueue("t1")); + connection.start(); + while (running.get()) { + producer.send(session.createTextMessage("hello")); + assertNotNull(consumer.receive(1000)); + } + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + + for (int i = 0; i < 5; i++) { + logger.info("\n*******************************************************************************************************************************\ntest {}\n*******************************************************************************************************************************", i); + backupServer.stop(); + Wait.assertFalse(backupServer::isStarted); + backupServer.start(); + Wait.assertTrue(backupServer::isStarted); + if (i % 2 == 1) { + Wait.assertTrue(backupServer::isReplicaSync); + } + + assertFalse(loggerHandler.findText("AMQ229254")); + assertFalse(loggerHandler.findText("AMQ229006")); + loggerHandler.clear(); + } + + running.set(false); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(0, errors.get()); + } + } + + @Test + public void testAckManagerRepetition() throws Exception { + + String queueName = "queue_" + RandomUtil.randomUUIDString(); + + // some extremely large retry settings + // just to make sure these records will never be removed + server.getConfiguration().setMirrorAckManagerQueueAttempts(300000); + server.getConfiguration().setMirrorAckManagerPageAttempts(300000); + server.getConfiguration().setMirrorAckManagerRetryDelay(60_000); + backupServer.getConfiguration().setMirrorAckManagerQueueAttempts(300000); + backupServer.getConfiguration().setMirrorAckManagerPageAttempts(300000); + backupServer.getConfiguration().setMirrorAckManagerRetryDelay(60_000); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + runAfter(executorService::shutdownNow); + + AtomicInteger errors = new AtomicInteger(0); + AtomicBoolean running = new AtomicBoolean(true); + + runAfter(() -> running.set(false)); + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch backupStarted = new CountDownLatch(1); + + AtomicInteger recordsSent = new AtomicInteger(0); + + int starBackupAt = 100; + assertFalse(server.isReplicaSync()); + assertFalse(backupServer.isStarted()); + + AckManager liveAckManager = AckManagerProvider.getManager(server); + server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST)); + Queue queueOnServerLive = server.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + long queueIdOnServerLive = queueOnServerLive.getID(); + + OperationContextImpl context = new OperationContextImpl(server.getExecutorFactory().getExecutor()); + + executorService.execute(() -> { + try { + OperationContextImpl.setContext(context); + while (running.get()) { + int id = recordsSent.getAndIncrement(); + if (id == starBackupAt) { + executorService.execute(() -> { + try { + backupServer.start(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } finally { + backupStarted.countDown(); + } + }); + } + CountDownLatch latchAcked = new CountDownLatch(1); + liveAckManager.ack(server.getNodeID().toString(), queueOnServerLive, id, AckReason.NORMAL, true); + OperationContextImpl.getContext().executeOnCompletion(new IOCallback() { + @Override + public void done() { + latchAcked.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + } + }); + if (!latchAcked.await(10, TimeUnit.SECONDS)) { + logger.warn("Could not wait ack to finish"); + } + Thread.yield(); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + + assertTrue(backupStarted.await(10, TimeUnit.SECONDS)); + + Wait.assertTrue(server::isReplicaSync); + Wait.assertTrue(() -> recordsSent.get() > 200); + running.set(false); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(0, errors.get()); + + validateAckManager(server, queueName, queueIdOnServerLive, recordsSent.get()); + + server.stop(); + Wait.assertTrue(backupServer::isActive); + + validateAckManager(backupServer, queueName, queueIdOnServerLive, recordsSent.get()); + } + + private void validateAckManager(ActiveMQServer server, + String queueName, + long queueIdOnServerLive, + int messagesSent) { + AckManager liveManager = AckManagerProvider.getManager(server); + Map<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = liveManager.sortRetries(); + assertEquals(1, sortedRetries.size()); + + LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> retryAddress = sortedRetries.get(SimpleString.of(queueName)); + JournalHashMap<AckRetry, AckRetry, Queue> journalHashMapBackup = retryAddress.get(queueIdOnServerLive); + assertEquals(messagesSent, journalHashMapBackup.size()); + } + + protected HAPolicyConfiguration createReplicationLiveConfiguration() { + ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + return haPolicy; + } + + protected HAPolicyConfiguration createReplicationBackupConfiguration() { + ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + haPolicy.setClusterName("cluster"); + return haPolicy; + } + + protected Configuration createLiveConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setJournalType(JournalType.NIO); + conf.setName("localhost::live"); + + File liveDir = newTemporaryFolder("live"); + conf.setBrokerInstance(liveDir); + + conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + conf.setHAPolicyConfiguration(createReplicationLiveConfiguration()); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); + ccconf.setName("cluster"); + ccconf.setConnectorName("live"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.NIO).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + protected Configuration createBackupConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::backup"); + + File backupDir = newTemporaryFolder("backup"); + conf.setBrokerInstance(backupDir); + + conf.setHAPolicyConfiguration(createReplicationBackupConfiguration()); + + conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); + ccconf.setName("cluster"); + ccconf.setConnectorName("backup"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.NIO).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayWithReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayWithReplicationTest.java index c0ddfd38e3..a4e5db0f6f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayWithReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayWithReplicationTest.java @@ -1,153 +1,153 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.retention; - -import org.apache.activemq.artemis.api.core.QueueConfiguration; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; -import org.apache.activemq.artemis.core.config.ha.DistributedLockManagerConfiguration; -import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; -import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServers; -import org.apache.activemq.artemis.core.server.JournalType; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; -import org.apache.activemq.artemis.tests.util.Wait; -import org.junit.jupiter.api.BeforeEach; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.TimeUnit; - -public class ReplayWithReplicationTest extends ReplayTest { - - private DistributedLockManagerConfiguration managerConfiguration; - ActiveMQServer backupServer; - - File newTemporaryFolder(String name) { - File newFolder = new File(temporaryFolder, name); - newFolder.mkdirs(); - return newFolder; - } - - @BeforeEach - @Override - public void setUp() throws Exception { - managerConfiguration = new DistributedLockManagerConfiguration(FileBasedLockManager.class.getName(), Collections.singletonMap("locks-folder", newTemporaryFolder("manager").toString())); - final int timeout = (int) TimeUnit.SECONDS.toMillis(30); - - // start live - Configuration liveConfiguration = createLiveConfiguration(); - - server = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration)); - server.setIdentity("PRIMARY"); - server.getConfiguration().setJournalRetentionDirectory(getJournalDir() + "retention"); - server.getConfiguration().setJournalFileSize(100 * 1024); - - server.start(); - - server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST)); - server.createQueue(QueueConfiguration.of("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST)); - - server.addAddressInfo(new AddressInfo("t2").addRoutingType(RoutingType.ANYCAST)); - server.createQueue(QueueConfiguration.of("t2").setAddress("t2").setRoutingType(RoutingType.ANYCAST)); - - // start backup - Configuration backupConfiguration = createBackupConfiguration(); - ((ReplicationBackupPolicyConfiguration) backupConfiguration.getHAPolicyConfiguration()).setAllowFailBack(true); - backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); - backupServer.setIdentity("BACKUP"); - backupServer.start(); - - Wait.waitFor(backupServer::isStarted); - - org.apache.activemq.artemis.utils.Wait.assertTrue(() -> backupServer.isReplicaSync(), timeout); - } - - protected HAPolicyConfiguration createReplicationLiveConfiguration() { - ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault(); - haPolicy.setDistributedManagerConfiguration(managerConfiguration); - return haPolicy; - } - - protected HAPolicyConfiguration createReplicationBackupConfiguration() { - ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault(); - haPolicy.setDistributedManagerConfiguration(managerConfiguration); - haPolicy.setClusterName("cluster"); - return haPolicy; - } - - protected Configuration createLiveConfiguration() throws Exception { - Configuration conf = new ConfigurationImpl(); - conf.setName("localhost::live"); - - File liveDir = newTemporaryFolder("live"); - conf.setBrokerInstance(liveDir); - - conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); - conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); - conf.addConnectorConfiguration("live", "tcp://localhost:61616"); - - conf.setClusterUser("mycluster"); - conf.setClusterPassword("mypassword"); - - conf.setHAPolicyConfiguration(createReplicationLiveConfiguration()); - - ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); - ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); - ccconf.setName("cluster"); - ccconf.setConnectorName("live"); - conf.addClusterConfiguration(ccconf); - - conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); - - return conf; - } - - protected Configuration createBackupConfiguration() throws Exception { - Configuration conf = new ConfigurationImpl(); - conf.setName("localhost::backup"); - - File backupDir = newTemporaryFolder("backup"); - conf.setBrokerInstance(backupDir); - - conf.setHAPolicyConfiguration(createReplicationBackupConfiguration()); - - conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); - conf.addConnectorConfiguration("live", "tcp://localhost:61616"); - conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); - - conf.setClusterUser("mycluster"); - conf.setClusterPassword("mypassword"); - - ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); - ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); - ccconf.setName("cluster"); - ccconf.setConnectorName("backup"); - conf.addClusterConfiguration(ccconf); - - conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); - - return conf; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.retention; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.DistributedLockManagerConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.BeforeEach; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +public class ReplayWithReplicationTest extends ReplayTest { + + private DistributedLockManagerConfiguration managerConfiguration; + ActiveMQServer backupServer; + + File newTemporaryFolder(String name) { + File newFolder = new File(temporaryFolder, name); + newFolder.mkdirs(); + return newFolder; + } + + @BeforeEach + @Override + public void setUp() throws Exception { + managerConfiguration = new DistributedLockManagerConfiguration(FileBasedLockManager.class.getName(), Collections.singletonMap("locks-folder", newTemporaryFolder("manager").toString())); + final int timeout = (int) TimeUnit.SECONDS.toMillis(30); + + // start live + Configuration liveConfiguration = createLiveConfiguration(); + + server = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration)); + server.setIdentity("PRIMARY"); + server.getConfiguration().setJournalRetentionDirectory(getJournalDir() + "retention"); + server.getConfiguration().setJournalFileSize(100 * 1024); + + server.start(); + + server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST)); + + server.addAddressInfo(new AddressInfo("t2").addRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of("t2").setAddress("t2").setRoutingType(RoutingType.ANYCAST)); + + // start backup + Configuration backupConfiguration = createBackupConfiguration(); + ((ReplicationBackupPolicyConfiguration) backupConfiguration.getHAPolicyConfiguration()).setAllowFailBack(true); + backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); + backupServer.setIdentity("BACKUP"); + backupServer.start(); + + Wait.waitFor(backupServer::isStarted); + + org.apache.activemq.artemis.utils.Wait.assertTrue(() -> backupServer.isReplicaSync(), timeout); + } + + protected HAPolicyConfiguration createReplicationLiveConfiguration() { + ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + return haPolicy; + } + + protected HAPolicyConfiguration createReplicationBackupConfiguration() { + ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + haPolicy.setClusterName("cluster"); + return haPolicy; + } + + protected Configuration createLiveConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::live"); + + File liveDir = newTemporaryFolder("live"); + conf.setBrokerInstance(liveDir); + + conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + conf.setHAPolicyConfiguration(createReplicationLiveConfiguration()); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); + ccconf.setName("cluster"); + ccconf.setConnectorName("live"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + protected Configuration createBackupConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::backup"); + + File backupDir = newTemporaryFolder("backup"); + conf.setBrokerInstance(backupDir); + + conf.setHAPolicyConfiguration(createReplicationBackupConfiguration()); + + conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); + ccconf.setName("cluster"); + ccconf.setConnectorName("backup"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + +} diff --git a/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp-updated.xml b/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp-updated.xml index e7c768d91e..36165a3027 100644 --- a/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp-updated.xml +++ b/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp-updated.xml @@ -1,121 +1,121 @@ -<?xml version='1.0'?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<configuration xmlns="urn:activemq" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> - - <core xmlns="urn:activemq:core"> - - <name>0.0.0.0</name> - - <configuration-file-refresh-period>100</configuration-file-refresh-period> - - <persistence-enabled>false</persistence-enabled> - - <security-enabled>false</security-enabled> - - <!-- this could be ASYNCIO or NIO - --> - <journal-type>NIO</journal-type> - - <paging-directory>./data/paging</paging-directory> - - <bindings-directory>./data/bindings</bindings-directory> - - <journal-directory>./data/journal</journal-directory> - - <large-messages-directory>./data/large-messages</large-messages-directory> - - <journal-min-files>2</journal-min-files> - - <journal-pool-files>-1</journal-pool-files> - - <!-- - This value was determined through a calculation. - Your system could perform 25 writes per millisecond - on the current journal configuration. - That translates as a sync write every 40000 nanoseconds - --> - <journal-buffer-timeout>40000</journal-buffer-timeout> - - - <acceptors> - <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. --> - <!-- performance tests have shown that openWire performs best with these buffer sizes --> - <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> - - <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> - <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor> - - <!-- STOMP Acceptor. --> - <acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor> - - <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> - <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor> - - <!-- MQTT Acceptor --> - <acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor> - - </acceptors> - - - <security-settings> - <security-setting match="#"> - <permission type="createNonDurableQueue" roles="a"/> - <permission type="deleteNonDurableQueue" roles="a"/> - <permission type="createDurableQueue" roles="a"/> - <permission type="deleteDurableQueue" roles="a"/> - <permission type="browse" roles="a"/> - <permission type="send" roles="a"/> - <!-- we need this otherwise ./artemis data imp wouldn't work --> - <permission type="manage" roles="a"/> - </security-setting> - </security-settings> - - <address-settings> - - - <address-setting match="#"> - <auto-create-queues>false</auto-create-queues> - <dead-letter-address>DLQ</dead-letter-address> - <expiry-address>ExpiryQueue</expiry-address> - <redelivery-delay>0</redelivery-delay> - <max-size-bytes>10Mb</max-size-bytes> - <message-counter-history-day-limit>10</message-counter-history-day-limit> - <address-full-policy>BLOCK</address-full-policy> - <config-delete-queues>FORCE</config-delete-queues> - <config-delete-addresses>FORCE</config-delete-addresses> - </address-setting> - </address-settings> - - <wildcard-addresses> - <delimiter>_</delimiter> - </wildcard-addresses> - - <addresses> - <address name="queue"> - <anycast> - <queue name="queue"/> - </anycast> - </address> - </addresses> - </core> -</configuration> +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <!-- this could be ASYNCIO or NIO + --> + <journal-type>NIO</journal-type> + + <paging-directory>./data/paging</paging-directory> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/large-messages</large-messages-directory> + + <journal-min-files>2</journal-min-files> + + <journal-pool-files>-1</journal-pool-files> + + <!-- + This value was determined through a calculation. + Your system could perform 25 writes per millisecond + on the current journal configuration. + That translates as a sync write every 40000 nanoseconds + --> + <journal-buffer-timeout>40000</journal-buffer-timeout> + + + <acceptors> + <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. --> + <!-- performance tests have shown that openWire performs best with these buffer sizes --> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + + <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> + <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor> + + <!-- STOMP Acceptor. --> + <acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor> + + <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> + <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor> + + <!-- MQTT Acceptor --> + <acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor> + + </acceptors> + + + <security-settings> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="a"/> + <permission type="deleteNonDurableQueue" roles="a"/> + <permission type="createDurableQueue" roles="a"/> + <permission type="deleteDurableQueue" roles="a"/> + <permission type="browse" roles="a"/> + <permission type="send" roles="a"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="a"/> + </security-setting> + </security-settings> + + <address-settings> + + + <address-setting match="#"> + <auto-create-queues>false</auto-create-queues> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <max-size-bytes>10Mb</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>BLOCK</address-full-policy> + <config-delete-queues>FORCE</config-delete-queues> + <config-delete-addresses>FORCE</config-delete-addresses> + </address-setting> + </address-settings> + + <wildcard-addresses> + <delimiter>_</delimiter> + </wildcard-addresses> + + <addresses> + <address name="queue"> + <anycast> + <queue name="queue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp.xml b/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp.xml index 05b7fd30f4..8a18b3c476 100644 --- a/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp.xml +++ b/tests/integration-tests/src/test/resources/RedeployTempTest-reload-temp.xml @@ -1,121 +1,121 @@ -<?xml version='1.0'?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<configuration xmlns="urn:activemq" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> - - <core xmlns="urn:activemq:core"> - - <name>0.0.0.0</name> - - <configuration-file-refresh-period>100</configuration-file-refresh-period> - - <persistence-enabled>false</persistence-enabled> - - <security-enabled>false</security-enabled> - - <!-- this could be ASYNCIO or NIO - --> - <journal-type>NIO</journal-type> - - <paging-directory>./data/paging</paging-directory> - - <bindings-directory>./data/bindings</bindings-directory> - - <journal-directory>./data/journal</journal-directory> - - <large-messages-directory>./data/large-messages</large-messages-directory> - - <journal-min-files>2</journal-min-files> - - <journal-pool-files>-1</journal-pool-files> - - <!-- - This value was determined through a calculation. - Your system could perform 25 writes per millisecond - on the current journal configuration. - That translates as a sync write every 40000 nanoseconds - --> - <journal-buffer-timeout>40000</journal-buffer-timeout> - - - <acceptors> - <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. --> - <!-- performance tests have shown that openWire performs best with these buffer sizes --> - <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> - - <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> - <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor> - - <!-- STOMP Acceptor. --> - <acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor> - - <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> - <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor> - - <!-- MQTT Acceptor --> - <acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor> - - </acceptors> - - - <security-settings> - <security-setting match="#"> - <permission type="createNonDurableQueue" roles="a"/> - <permission type="deleteNonDurableQueue" roles="a"/> - <permission type="createDurableQueue" roles="a"/> - <permission type="deleteDurableQueue" roles="a"/> - <permission type="browse" roles="a"/> - <permission type="send" roles="a"/> - <!-- we need this otherwise ./artemis data imp wouldn't work --> - <permission type="manage" roles="a"/> - </security-setting> - </security-settings> - - <address-settings> - - - <address-setting match="#"> - <auto-create-queues>false</auto-create-queues> - <dead-letter-address>DLQ</dead-letter-address> - <expiry-address>ExpiryQueue</expiry-address> - <redelivery-delay>0</redelivery-delay> - <max-size-bytes>10Mb</max-size-bytes> - <message-counter-history-day-limit>10</message-counter-history-day-limit> - <address-full-policy>BLOCK</address-full-policy> - <config-delete-queues>FORCE</config-delete-queues> - <config-delete-addresses>FORCE</config-delete-addresses> - </address-setting> - </address-settings> - - <wildcard-addresses> - <delimiter>_</delimiter> - </wildcard-addresses> - - <addresses> - <address name="queue"> - <anycast> - <queue name="queue"/> - </anycast> - </address> - </addresses> - </core> -</configuration> +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <!-- this could be ASYNCIO or NIO + --> + <journal-type>NIO</journal-type> + + <paging-directory>./data/paging</paging-directory> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/large-messages</large-messages-directory> + + <journal-min-files>2</journal-min-files> + + <journal-pool-files>-1</journal-pool-files> + + <!-- + This value was determined through a calculation. + Your system could perform 25 writes per millisecond + on the current journal configuration. + That translates as a sync write every 40000 nanoseconds + --> + <journal-buffer-timeout>40000</journal-buffer-timeout> + + + <acceptors> + <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. --> + <!-- performance tests have shown that openWire performs best with these buffer sizes --> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + + <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> + <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor> + + <!-- STOMP Acceptor. --> + <acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor> + + <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> + <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor> + + <!-- MQTT Acceptor --> + <acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor> + + </acceptors> + + + <security-settings> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="a"/> + <permission type="deleteNonDurableQueue" roles="a"/> + <permission type="createDurableQueue" roles="a"/> + <permission type="deleteDurableQueue" roles="a"/> + <permission type="browse" roles="a"/> + <permission type="send" roles="a"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="a"/> + </security-setting> + </security-settings> + + <address-settings> + + + <address-setting match="#"> + <auto-create-queues>false</auto-create-queues> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <max-size-bytes>10Mb</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>BLOCK</address-full-policy> + <config-delete-queues>FORCE</config-delete-queues> + <config-delete-addresses>FORCE</config-delete-addresses> + </address-setting> + </address-settings> + + <wildcard-addresses> + <delimiter>_</delimiter> + </wildcard-addresses> + + <addresses> + <address name="queue"> + <anycast> + <queue name="queue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
