Repository: activemq-artemis Updated Branches: refs/heads/master aa4cac64b -> c67df73b8
ARTEMIS-238 and ARTEMIS-236 Moving HQClient to its own module avoiding uncessary server's dependencies Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bc828c00 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bc828c00 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bc828c00 Branch: refs/heads/master Commit: bc828c0017c474164b2f46cdd84ae93d5bf8c682 Parents: aa4cac6 Author: Clebert Suconic <[email protected]> Authored: Thu Oct 22 10:05:58 2015 -0400 Committer: Clebert Suconic <[email protected]> Committed: Thu Oct 22 10:15:14 2015 -0400 ---------------------------------------------------------------------- artemis-distribution/src/main/assembly/dep.xml | 1 + .../artemis-hornetq-protocol/pom.xml | 5 + .../HQPropertiesConversionInterceptor.java | 54 ---------- .../client/HornetQClientProtocolManager.java | 67 ------------ .../HornetQClientProtocolManagerFactory.java | 46 --------- .../client/HornetQClientSessionContext.java | 101 ------------------- .../hornetq/util/HQPropertiesConverter.java | 86 ---------------- .../artemis-hqclient-protocol/pom.xml | 53 ++++++++++ .../HQPropertiesConversionInterceptor.java | 54 ++++++++++ .../client/HornetQClientProtocolManager.java | 67 ++++++++++++ .../HornetQClientProtocolManagerFactory.java | 46 +++++++++ .../client/HornetQClientSessionContext.java | 101 +++++++++++++++++++ .../hornetq/util/HQPropertiesConverter.java | 86 ++++++++++++++++ ...mis.spi.core.protocol.ProtocolManagerFactory | 1 + artemis-protocols/pom.xml | 1 + 15 files changed, 415 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-distribution/src/main/assembly/dep.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 084d248..ad6bee5 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -59,6 +59,7 @@ <include>org.apache.activemq:artemis-openwire-protocol</include> <include>org.apache.activemq:artemis-proton-plug</include> <include>org.apache.activemq:artemis-hornetq-protocol</include> + <include>org.apache.activemq:artemis-hqclient-protocol</include> <include>org.apache.activemq:artemis-stomp-protocol</include> <include>org.apache.activemq:artemis-mqtt-protocol</include> <include>org.apache.activemq:artemis-ra</include> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hornetq-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/pom.xml b/artemis-protocols/artemis-hornetq-protocol/pom.xml index 4b579c2..587ca1e 100644 --- a/artemis-protocols/artemis-hornetq-protocol/pom.xml +++ b/artemis-protocols/artemis-hornetq-protocol/pom.xml @@ -48,6 +48,11 @@ <artifactId>artemis-server</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-hqclient-protocol</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java deleted file mode 100644 index 012727f..0000000 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.protocol.hornetq; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI; -import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; - -public class HQPropertiesConversionInterceptor implements Interceptor { - - - private final boolean replaceHQ; - - public HQPropertiesConversionInterceptor(final boolean replaceHQ) { - this.replaceHQ = replaceHQ; - } - - @Override - public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { - - if (HQPropertiesConverter.isMessagePacket(packet)) { - handleReceiveMessage((MessagePacketI) packet); - } - return true; - } - - private void handleReceiveMessage(MessagePacketI messagePacket) { - if (replaceHQ) { - HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage()); - } - else { - HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java deleted file mode 100644 index a1d9a60..0000000 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.protocol.hornetq.client; - -import org.apache.activemq.artemis.core.protocol.core.Channel; -import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; -import org.apache.activemq.artemis.core.version.Version; -import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.SessionContext; - -public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager { - - private static final int VERSION_PLAYED = 123; - protected void sendHandshake(Connection transportConnection) { - } - - - protected SessionContext newSessionContext(String name, - int confirmationWindowSize, - Channel sessionChannel, - CreateSessionResponseMessage response) { - // these objects won't be null, otherwise it would keep retrying on the previous loop - return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); - } - - @Override - protected Packet newCreateSessionPacket(Version clientVersion, - String name, - String username, - String password, - boolean xa, - boolean autoCommitSends, - boolean autoCommitAcks, - boolean preAcknowledge, - int minLargeMessageSize, - int confirmationWindowSize, - long sessionChannelID) { - return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); - } - - @Override - public void sendSubscribeTopology(final boolean isServer) { - getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); - } - - - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java deleted file mode 100644 index ed57cfe..0000000 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.protocol.hornetq.client; - -import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor; -import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; -import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; - -public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory { - - - ServerLocator locator; - - @Override - public ServerLocator getLocator() { - return locator; - } - - @Override - public void setLocator(ServerLocator locator) { - this.locator = locator; - locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true)); - locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false)); - } - - @Override - public ClientProtocolManager newProtocolManager() { - return new HornetQClientProtocolManager(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java deleted file mode 100644 index 169a82a..0000000 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.protocol.hornetq.client; - -import java.util.concurrent.Executor; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl; -import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; -import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; -import org.apache.activemq.artemis.core.protocol.core.Channel; -import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; -import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; - -public class HornetQClientSessionContext extends ActiveMQSessionContext { - - public HornetQClientSessionContext(String name, - RemotingConnection remotingConnection, - Channel sessionChannel, - int serverVersion, - int confirmationWindow) { - super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow); - } - - - public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { - SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); - SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); - - return response.toQueueQuery(); - } - - protected CreateSessionMessage newCreateSession(String username, - String password, - int minLargeMessageSize, - boolean xa, - boolean autoCommitSends, - boolean autoCommitAcks, - boolean preAcknowledge, - SimpleString defaultAddress) { - return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); - } - - - public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { - SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); - - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false); - } - - public ClientConsumerInternal createConsumer(SimpleString queueName, - SimpleString filterString, - int windowSize, - int maxRate, - int ackBatchSize, - boolean browseOnly, - Executor executor, - Executor flowControlExecutor) throws ActiveMQException { - long consumerID = idGenerator.generateID(); - - ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); - - SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); - - SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); - - // The actual windows size that gets used is determined by the user since - // could be overridden on the queue settings - // The value we send is just a hint - - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); - } - - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java deleted file mode 100644 index 9240e55..0000000 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.protocol.hornetq.util; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; - -public class HQPropertiesConverter { - - private static Map<SimpleString, SimpleString> hqAmqDictionary; - private static Map<SimpleString, SimpleString> amqHqDictionary; - - static { - Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>(); - - // Add entries for outgoing messages - d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY")); - d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS")); - d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE")); - d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID")); - d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID")); - d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED")); - d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE")); - d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY")); - d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID")); - d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME")); - - hqAmqDictionary = Collections.unmodifiableMap(d); - - d = new HashMap<>(); - // inverting the direction - for (Map.Entry<SimpleString, SimpleString> entry: hqAmqDictionary.entrySet()) { - d.put(entry.getValue(), entry.getKey()); - } - - amqHqDictionary = Collections.unmodifiableMap(d); - } - - public static void replaceAMQProperties(final Message message) { - replaceDict(message, amqHqDictionary); - } - - public static void replaceHQProperties(final Message message) { - replaceDict(message, hqAmqDictionary); - } - - private static void replaceDict(final Message message, Map<SimpleString, SimpleString> dictionary) { - for (SimpleString property : new HashSet<>(message.getPropertyNames())) { - SimpleString replaceTo = dictionary.get(property); - if (replaceTo != null) { - message.putObjectProperty(replaceTo, message.removeProperty(property)); - } - } - } - - public static boolean isMessagePacket(Packet packet) { - int type = packet.getType(); - return type == PacketImpl.SESS_SEND || - type == PacketImpl.SESS_SEND_LARGE || - type == PacketImpl.SESS_RECEIVE_LARGE_MSG || - type == PacketImpl.SESS_RECEIVE_MSG; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/pom.xml b/artemis-protocols/artemis-hqclient-protocol/pom.xml new file mode 100644 index 0000000..7f867ed --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/pom.xml @@ -0,0 +1,53 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>artemis-protocols</artifactId> + <groupId>org.apache.activemq</groupId> + <version>1.1.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>artemis-hqclient-protocol</artifactId> + + <properties> + <activemq.basedir>${project.basedir}/../..</activemq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + <scope>provided</scope> + <optional>true</optional> + </dependency> + + <!-- + JBoss Logging + --> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-core-client</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java new file mode 100644 index 0000000..012727f --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI; +import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; + +public class HQPropertiesConversionInterceptor implements Interceptor { + + + private final boolean replaceHQ; + + public HQPropertiesConversionInterceptor(final boolean replaceHQ) { + this.replaceHQ = replaceHQ; + } + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + + if (HQPropertiesConverter.isMessagePacket(packet)) { + handleReceiveMessage((MessagePacketI) packet); + } + return true; + } + + private void handleReceiveMessage(MessagePacketI messagePacket) { + if (replaceHQ) { + HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage()); + } + else { + HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java new file mode 100644 index 0000000..a1d9a60 --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.version.Version; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; + +public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager { + + private static final int VERSION_PLAYED = 123; + protected void sendHandshake(Connection transportConnection) { + } + + + protected SessionContext newSessionContext(String name, + int confirmationWindowSize, + Channel sessionChannel, + CreateSessionResponseMessage response) { + // these objects won't be null, otherwise it would keep retrying on the previous loop + return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); + } + + @Override + protected Packet newCreateSessionPacket(Version clientVersion, + String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize, + long sessionChannelID) { + return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + } + + @Override + public void sendSubscribeTopology(final boolean isServer) { + getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); + } + + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java new file mode 100644 index 0000000..ed57cfe --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; + +public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory { + + + ServerLocator locator; + + @Override + public ServerLocator getLocator() { + return locator; + } + + @Override + public void setLocator(ServerLocator locator) { + this.locator = locator; + locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true)); + locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false)); + } + + @Override + public ClientProtocolManager newProtocolManager() { + return new HornetQClientProtocolManager(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java new file mode 100644 index 0000000..169a82a --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; + +public class HornetQClientSessionContext extends ActiveMQSessionContext { + + public HornetQClientSessionContext(String name, + RemotingConnection remotingConnection, + Channel sessionChannel, + int serverVersion, + int confirmationWindow) { + super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow); + } + + + public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + return response.toQueueQuery(); + } + + protected CreateSessionMessage newCreateSession(String username, + String password, + int minLargeMessageSize, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + SimpleString defaultAddress) { + return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); + } + + + public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { + SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false); + } + + public ClientConsumerInternal createConsumer(SimpleString queueName, + SimpleString filterString, + int windowSize, + int maxRate, + int ackBatchSize, + boolean browseOnly, + Executor executor, + Executor flowControlExecutor) throws ActiveMQException { + long consumerID = idGenerator.generateID(); + + ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); + + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); + + SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + // The actual windows size that gets used is determined by the user since + // could be overridden on the queue settings + // The value we send is just a hint + + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java new file mode 100644 index 0000000..9240e55 --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +public class HQPropertiesConverter { + + private static Map<SimpleString, SimpleString> hqAmqDictionary; + private static Map<SimpleString, SimpleString> amqHqDictionary; + + static { + Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>(); + + // Add entries for outgoing messages + d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY")); + d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS")); + d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE")); + d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID")); + d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID")); + d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED")); + d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE")); + d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY")); + d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID")); + d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME")); + + hqAmqDictionary = Collections.unmodifiableMap(d); + + d = new HashMap<>(); + // inverting the direction + for (Map.Entry<SimpleString, SimpleString> entry: hqAmqDictionary.entrySet()) { + d.put(entry.getValue(), entry.getKey()); + } + + amqHqDictionary = Collections.unmodifiableMap(d); + } + + public static void replaceAMQProperties(final Message message) { + replaceDict(message, amqHqDictionary); + } + + public static void replaceHQProperties(final Message message) { + replaceDict(message, hqAmqDictionary); + } + + private static void replaceDict(final Message message, Map<SimpleString, SimpleString> dictionary) { + for (SimpleString property : new HashSet<>(message.getPropertyNames())) { + SimpleString replaceTo = dictionary.get(property); + if (replaceTo != null) { + message.putObjectProperty(replaceTo, message.removeProperty(property)); + } + } + } + + public static boolean isMessagePacket(Packet packet) { + int type = packet.getType(); + return type == PacketImpl.SESS_SEND || + type == PacketImpl.SESS_SEND_LARGE || + type == PacketImpl.SESS_RECEIVE_LARGE_MSG || + type == PacketImpl.SESS_RECEIVE_MSG; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/artemis-hqclient-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory b/artemis-protocols/artemis-hqclient-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory new file mode 100644 index 0000000..059e800 --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory @@ -0,0 +1 @@ +org.apache.activemq.artemis.core.protocol.hornetq.HornetQProtocolManagerFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc828c00/artemis-protocols/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/pom.xml b/artemis-protocols/pom.xml index ab0fc47..5eb7277 100644 --- a/artemis-protocols/pom.xml +++ b/artemis-protocols/pom.xml @@ -36,6 +36,7 @@ <module>artemis-openwire-protocol</module> <module>artemis-proton-plug</module> <module>artemis-hornetq-protocol</module> + <module>artemis-hqclient-protocol</module> <module>artemis-mqtt-protocol</module> </modules>
