This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new cf85d35 ARTEMIS-3308 - support federation of large messages
cf85d35 is described below
commit cf85d35355a4c9965a1916066bfdde821c8a84bb
Author: gtully <[email protected]>
AuthorDate: Tue Oct 19 11:19:36 2021 +0100
ARTEMIS-3308 - support federation of large messages
---
.../impl/CompressedLargeMessageControllerImpl.java | 5 ++++
.../core/client/impl/LargeMessageController.java | 1 +
.../client/impl/LargeMessageControllerImpl.java | 7 +++++-
.../artemis/core/server/ActiveMQServerLogger.java | 5 ++++
.../federation/FederatedQueueConsumerImpl.java | 26 ++++++++++++++++++-
.../integration/federation/FederatedQueueTest.java | 29 +++++++++++++++++++++-
6 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 58c3511..a691387 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -89,6 +89,11 @@ final class CompressedLargeMessageControllerImpl implements
LargeMessageControll
}
@Override
+ public LargeMessageControllerImpl.LargeData take() throws
InterruptedException {
+ return bufferDelegate.take();
+ }
+
+ @Override
public int capacity() {
return -1;
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
index 165a4d6..0565c83 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
@@ -61,4 +61,5 @@ public interface LargeMessageController extends
ActiveMQBuffer {
*/
boolean waitCompletion(long timeWait) throws ActiveMQException;
+ LargeMessageControllerImpl.LargeData take() throws InterruptedException;
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index f91878d..be2bb78 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -318,6 +318,11 @@ public class LargeMessageControllerImpl implements
LargeMessageController {
}
+ @Override
+ public LargeData take() throws InterruptedException {
+ return largeMessageData.take();
+ }
+
/**
* @throws ActiveMQException
*/
@@ -1328,7 +1333,7 @@ public class LargeMessageControllerImpl implements
LargeMessageController {
throw new
IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- private static class LargeData {
+ public static class LargeData {
final byte[] chunk;
final int flowControlSize;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index d10016b..ac75bec 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1763,6 +1763,11 @@ public interface ActiveMQServerLogger extends
BasicLogger {
@Message(id = 222304, value = "Unable to load message from journal", format
= Message.Format.MESSAGE_FORMAT)
void unableToLoadMessageFromJournal(@Cause Throwable t);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222305, value = "Error federating message {0}.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void federationDispatchError(@Cause Throwable e, String message);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format =
Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index 0f408a1..c060427 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -28,12 +28,17 @@ import
org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
+import static
org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;
+
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer,
SessionFailureListener {
private static final Logger logger =
Logger.getLogger(FederatedQueueConsumerImpl.class);
@@ -174,6 +179,24 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
@Override
public void onMessage(ClientMessage clientMessage) {
try {
+ Message message = clientMessage;
+ if (message instanceof ClientLargeMessageInternal) {
+
+ final StorageManager storageManager = server.getStorageManager();
+ LargeServerMessage lsm =
storageManager.createLargeMessage(storageManager.generateID(), message);
+
+ LargeData largeData = null;
+ do {
+ // block on reading all pending chunks, ok as we are called
from an executor
+ largeData = ((ClientLargeMessageInternal)
clientMessage).getLargeMessageController().take();
+ lsm.addBytes(largeData.getChunk());
+ }
+ while (largeData.isContinues());
+
+ message = lsm.toMessage();
+ lsm.releaseResources(true, true);
+ }
+
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin ->
plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
@@ -183,7 +206,7 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
}
}
- Message message = transformer == null ? clientMessage :
transformer.transform(clientMessage);
+ message = transformer == null ? message :
transformer.transform(message);
if (message != null) {
server.getPostOffice().route(message, true);
}
@@ -198,6 +221,7 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
}
}
} catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.federationDispatchError(e,
clientMessage.toString());
try {
clientSession.rollback();
} catch (ActiveMQException e1) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
index 1b5b814..6acbc77 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
@@ -61,7 +61,7 @@ public class FederatedQueueTest extends FederatedTestBase {
@Override
protected void configureQueues(ActiveMQServer server) throws Exception {
- server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(-1));
createSimpleQueue(server, getName());
}
@@ -244,6 +244,33 @@ public class FederatedQueueTest extends FederatedTestBase {
}
@Test
+ public void testWithLargeMessage() throws Exception {
+ String queueName = getName();
+
+ FederationConfiguration federationConfiguration =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1",
queueName);
+
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
+ getServer(0).getFederationManager().deploy();
+
+ ConnectionFactory cf1 = getCF(1);
+ ConnectionFactory cf0 = getCF(0);
+ final String payload = new String(new byte[1 * 1024 *
1024]).replace('\0','+');
+ try (Connection connection1 = cf1.createConnection(); Connection
connection0 = cf0.createConnection()) {
+ connection1.start();
+ Session session1 = connection1.createSession();
+ Queue queue1 = session1.createQueue(queueName);
+ MessageProducer producer = session1.createProducer(queue1);
+ producer.send(session1.createTextMessage(payload));
+
+ connection0.start();
+ Session session0 = connection0.createSession();
+ Queue queue0 = session0.createQueue(queueName);
+ MessageConsumer consumer0 = session0.createConsumer(queue0);
+
+ assertNotNull(consumer0.receive(60000));
+ }
+ }
+
+ @Test
public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist()
throws Exception {
String queueName = getName();
ConnectionFactory cf0 = getCF(0);