This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new ef216972 [ISSUE #1107] [Java] LiteTopic support for "RIP‐83 Lite
Topic" (#1108)
ef216972 is described below
commit ef216972730c47eeaaf7575a1d125bfd4fbabe4e
Author: Quan <[email protected]>
AuthorDate: Mon Oct 20 11:47:57 2025 +0800
[ISSUE #1107] [Java] LiteTopic support for "RIP‐83 Lite Topic" (#1108)
* producer lite topic
* new MessageImpl with MessageBuilderImpl
* lite push consumer
* createFifoConsumeService
* default fifo true for LitePushConsumerSettings
* lite push consumer sub bindTopic *
* 1. fix heartbeat;
2. add version for interest;
* upgrade to 5.0.9-lite-topic-SNAPSHOT and fix logback
* add some test
* subscribeLite List<String>
* Collection instead of List
* check point
* syncLiteSubscription
* code refactor
* fix check style
* onNotifyUnsubscribeLiteCommand
* LiteSubscriptionQuotaExceededException and LiteTopicQuotaExceededException
* subscribeLite atomic
* client side quota check
* client-side lite subscription quota limit
* checkLiteSubscriptionQuota 3000
* validateLiteTopic for lite push consumer
* ack with lite topic if present.
* forward syncSettings exception to upper layer
* set liteSubscriptionQuota 1200
* lite push conusmer setInvisibleDuration
* LitePushConsumerSettings#toString more fields
* revert InvisibleDuration
* log when subscribeLite raise ClientException
* LitePushConsumerImplTest
* more clearer LitePushConsumer Interface
* remove subscribeLite(Collection<String>)
* code refactor and more unit test
* remove setEnableFifoConsumeAccelerator
* code recover
* fix
* fix
* fix code style
* add liteTopic for ForwardMessageToDeadLetterQueueRequest
* fix normal group consume lite topic
* log client type
* notify unsubscribe lite with liteTopic only
* fix test
* complete lite example
* LitePushConsumer#getLiteTopicSet
* fix comments
* remove fifo = true
* log warn for onNotifyUnsubscribeLiteCommand default implementation
* use new proto and update to 5.1.0-lite-topic-SNAPSHOT
Change-Id: I01f0c9c64be2614d0215560240548bba2078e505
* log when subscribeLite and unsubscribeLite
Change-Id: I01cb9cd4d9a474218e674662407708cf069bf426
* revert pom config
Change-Id: I7cc3f5624bf73b8cfeb902f4e0fc3b1fc7d4062d
* rocketmq-proto.version to 2.1.0-SNAPSHOT
Change-Id: I686e9ab7eaed1af1d1b01a57a88438af4bfa1878
* liteSubscriptionQuota default to 0
Change-Id: I3bcfaaa323b1eea3bd0665ab1d8722a7bc47a4fe
* upgrade to 5.1.0-SNAPSHOT
Change-Id: I360dbd38de8fa2cc21ab4ebb827fcbdce0258fa6
* add comments
Change-Id: I062b4a92032788b6c361304d45b6a40e9a28923a
* add comments
Change-Id: I53a21ec7faa0ba85d46c020a1709f4e011c799ec
---------
Co-authored-by: Quan <[email protected]>
Co-authored-by: moling <[email protected]>
---
java/client-apis/pom.xml | 2 +-
.../client/apis/ClientServiceProvider.java | 8 +
.../client/apis/consumer/LitePushConsumer.java | 73 +++++++
.../apis/consumer/LitePushConsumerBuilder.java | 93 +++++++++
.../rocketmq/client/apis/message/Message.java | 7 +
.../client/apis/message/MessageBuilder.java | 8 +
.../rocketmq/client/apis/message/MessageView.java | 7 +
java/client-shade/pom.xml | 2 +-
java/client/pom.xml | 2 +-
.../client/java/example/LiteProducerExample.java | 66 +++++++
.../java/example/LitePushConsumerExample.java | 100 ++++++++++
.../LiteSubscriptionQuotaExceededException.java} | 21 +-
.../LiteTopicQuotaExceededException.java} | 21 +-
.../client/java/exception/StatusChecker.java | 5 +
.../rocketmq/client/java/impl/ClientImpl.java | 12 ++
.../rocketmq/client/java/impl/ClientManager.java | 16 ++
.../client/java/impl/ClientManagerImpl.java | 20 ++
.../java/impl/ClientServiceProviderImpl.java | 14 +-
.../client/java/impl/ClientSessionImpl.java | 21 +-
.../rocketmq/client/java/impl/ClientType.java | 4 +
.../apache/rocketmq/client/java/impl/Settings.java | 4 +
.../client/java/impl/consumer/ConsumerImpl.java | 17 +-
.../impl/consumer/LitePushConsumerBuilderImpl.java | 108 ++++++++++
.../java/impl/consumer/LitePushConsumerImpl.java | 220 +++++++++++++++++++++
.../impl/consumer/LitePushConsumerSettings.java | 127 ++++++++++++
.../java/impl/consumer/ProcessQueueImpl.java | 6 +-
.../java/impl/consumer/PushConsumerImpl.java | 55 +++---
.../impl/consumer/PushSubscriptionSettings.java | 25 ++-
.../java/impl/consumer/SimpleConsumerImpl.java | 8 -
.../java/impl/producer/ClientSessionHandler.java | 6 +
.../client/java/message/GeneralMessage.java | 7 +
.../client/java/message/GeneralMessageImpl.java | 8 +
.../client/java/message/MessageBuilderImpl.java | 28 ++-
.../rocketmq/client/java/message/MessageImpl.java | 27 ++-
.../rocketmq/client/java/message/MessageType.java | 5 +
.../client/java/message/MessageViewImpl.java | 20 +-
.../client/java/message/PublishingMessageImpl.java | 7 +
.../rocketmq/client/java/route/Endpoints.java | 10 +-
.../apache/rocketmq/client/java/rpc/RpcClient.java | 14 ++
.../rocketmq/client/java/rpc/RpcClientImpl.java | 12 ++
.../client/java/ClientServiceProviderImplTest.java | 7 +
.../client/java/exception/StatusCheckerTest.java | 27 +++
.../client/java/impl/ClientManagerImplTest.java | 10 +
.../client/java/impl/ClientSessionImplTest.java | 24 +++
.../consumer/LitePushConsumerBuilderImplTest.java | 124 ++++++++++++
.../impl/consumer/LitePushConsumerImplTest.java | 125 ++++++++++++
.../java/impl/consumer/ProcessQueueImplTest.java | 2 +-
.../consumer/PushSubscriptionSettingsTest.java | 43 ++--
.../java/message/GeneralMessageImplTest.java | 132 ++++++++++++-
.../client/java/message/MessageImplTest.java | 17 ++
.../apache/rocketmq/client/java/tool/TestBase.java | 2 +-
java/pom.xml | 4 +-
java/test/pom.xml | 2 +-
53 files changed, 1589 insertions(+), 146 deletions(-)
diff --git a/java/client-apis/pom.xml b/java/client-apis/pom.xml
index 807af5a6..57f9a55e 100644
--- a/java/client-apis/pom.xml
+++ b/java/client-apis/pom.xml
@@ -18,7 +18,7 @@
<parent>
<artifactId>rocketmq-client-java-parent</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.0.9-SNAPSHOT</version>
+ <version>5.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
index 523f1f8c..723ca2e2 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.apis;
import java.util.Iterator;
import java.util.ServiceLoader;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
@@ -59,6 +60,13 @@ public interface ClientServiceProvider {
*/
PushConsumerBuilder newPushConsumerBuilder();
+ /**
+ * Get the lite push consumer builder by the current provider.
+ *
+ * @return the lite push consumer builder instance.
+ */
+ LitePushConsumerBuilder newLitePushConsumerBuilder();
+
/**
* Get the simple consumer builder by the current provider.
*
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
new file mode 100644
index 00000000..2cf276b3
--- /dev/null
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.apis.consumer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import org.apache.rocketmq.client.apis.ClientException;
+
+public interface LitePushConsumer extends Closeable {
+
+ /**
+ * Subscribe to a lite topic.
+ * <p>
+ * The subscribeLite() method initiates network requests and performs
quota verification, so it may fail.
+ * It's important to check the result of this call to ensure that the
subscription was successfully added.
+ * Possible failure scenarios include:
+ * 1. Network request errors, which can be retried.
+ * 2. Quota verification failures, indicated by
LiteSubscriptionQuotaExceededException. In this case,
+ * evaluate whether the quota is insufficient and promptly unsubscribe
from unused subscriptions
+ * using unsubscribeLite() to free up resources.
+ *
+ * @param liteTopic the name of the lite topic to subscribe to
+ * @throws ClientException if an error occurs during subscription
+ */
+ void subscribeLite(String liteTopic) throws ClientException;
+
+ /**
+ * Unsubscribe from a lite topic.
+ *
+ * @param liteTopic the name of the lite topic to unsubscribe from
+ * @throws ClientException if an error occurs during unsubscription
+ */
+ void unsubscribeLite(String liteTopic) throws ClientException;
+
+ /**
+ * Get the lite topic immutable set.
+ *
+ * @return lite topic immutable set.
+ */
+ Set<String> getLiteTopicSet();
+
+ /**
+ * Get the load balancing group for the consumer.
+ *
+ * @return consumer load balancing group.
+ */
+ String getConsumerGroup();
+
+ /**
+ * Close the consumer and release all related resources.
+ *
+ * <p>Once consumer is closed, <strong>it could not be started once
again.</strong> we maintained an FSM
+ * (finite-state machine) to record the different states for each push
consumer.
+ */
+ @Override
+ void close() throws IOException;
+}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumerBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumerBuilder.java
new file mode 100644
index 00000000..3e526bc2
--- /dev/null
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumerBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.apis.consumer;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+
+/**
+ * Builder to config and start {@link LitePushConsumer}.
+ */
+public interface LitePushConsumerBuilder {
+
+ /**
+ * Set the bind topic for lite push consumer.
+ *
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder bindTopic(String bindTopic);
+
+ /**
+ * Set the client configuration for the consumer.
+ *
+ * @param clientConfiguration client's configuration.
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder setClientConfiguration(ClientConfiguration
clientConfiguration);
+
+ /**
+ * Set the load balancing group for the consumer.
+ *
+ * @param consumerGroup consumer load balancing group.
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder setConsumerGroup(String consumerGroup);
+
+ /**
+ * Register message listener, all messages meet the subscription
expression would across listener here.
+ *
+ * @param listener message listener.
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder setMessageListener(MessageListener listener);
+
+ /**
+ * Set the maximum number of messages cached locally.
+ *
+ * @param count message count.
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder setMaxCacheMessageCount(int count);
+
+ /**
+ * Set the maximum bytes of messages cached locally.
+ *
+ * @param bytes message size.
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes);
+
+ /**
+ * Set the consumption thread count in parallel.
+ *
+ * @param count thread count.
+ * @return the consumer builder instance.
+ */
+ LitePushConsumerBuilder setConsumptionThreadCount(int count);
+
+ /**
+ * Finalize the build of {@link LitePushConsumer} and start.
+ *
+ * <p>This method will block until the push consumer starts successfully.
+ *
+ * <p>Especially, if this method is invoked more than once, different push
consumers will be created and started.
+ *
+ * @return the lite push consumer instance.
+ */
+ LitePushConsumer build() throws ClientException;
+}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
index d08b9c7a..e82be932 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
@@ -72,6 +72,13 @@ public interface Message {
*/
Optional<String> getMessageGroup();
+ /**
+ * Get the lite topic, which is used for lite topic message type.
+ *
+ * @return lite topic, which is optional, {@link Optional#empty()} means
lite topic is not specified.
+ */
+ Optional<String> getLiteTopic();
+
/**
* Get the expected delivery timestamp, which make sense only when topic
type is delay.
*
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
index 8275e881..3ccea469 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
@@ -87,6 +87,14 @@ public interface MessageBuilder {
*/
MessageBuilder setMessageGroup(String messageGroup);
+ /**
+ * Set the lite topic for the message, which is optional.
+ *
+ * @param liteTopic lite topic for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setLiteTopic(String liteTopic);
+
/**
* Set the delivery timestamp for the message, which is optional.
*
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
index dded0d4b..2f7909ae 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
@@ -79,6 +79,13 @@ public interface MessageView {
*/
Optional<String> getMessageGroup();
+ /**
+ * Get the lite topic, which makes sense only when the topic type is LITE.
+ *
+ * @return lite topic, which is optional, {@link Optional#empty()} means
lite topic is not specified.
+ */
+ Optional<String> getLiteTopic();
+
/**
* Get the expected delivery timestamp, which makes sense only when the
topic type is delay.
*
diff --git a/java/client-shade/pom.xml b/java/client-shade/pom.xml
index 57d633ea..241da9cf 100644
--- a/java/client-shade/pom.xml
+++ b/java/client-shade/pom.xml
@@ -18,7 +18,7 @@
<parent>
<artifactId>rocketmq-client-java-parent</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.0.9-SNAPSHOT</version>
+ <version>5.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/java/client/pom.xml b/java/client/pom.xml
index 11b2f424..2d16677c 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -18,7 +18,7 @@
<parent>
<artifactId>rocketmq-client-java-parent</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.0.9-SNAPSHOT</version>
+ <version>5.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LiteProducerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LiteProducerExample.java
new file mode 100644
index 00000000..e841f28c
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LiteProducerExample.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import
org.apache.rocketmq.client.java.exception.LiteTopicQuotaExceededException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LiteProducerExample {
+ static final Logger log =
LoggerFactory.getLogger(LiteProducerExample.class);
+
+ private LiteProducerExample() {
+ }
+
+ public static void main(String[] args) throws ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+
+ String topic = "yourParentTopic";
+ final Producer producer = ProducerSingleton.getInstance(topic);
+ // Define your message body.
+ byte[] body = "This is a lite message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey-3ee439f945d7")
+ // Set your lite topic
+ .setLiteTopic("lite-topic-1")
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ log.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (LiteTopicQuotaExceededException e) {
+ // Lite topic quota exceeded.
+ // Evaluate and increase the lite topic resource limit.
+ log.error("Lite topic quota exceeded", e);
+ } catch (Throwable t) {
+ log.error("Failed to send message", t);
+ }
+ // Close the producer when you don't need it anymore.
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
new file mode 100644
index 00000000..ff990d4c
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LitePushConsumerExample {
+ private static final Logger log =
LoggerFactory.getLogger(LitePushConsumerExample.class);
+
+ private LitePushConsumerExample() {
+ }
+
+ public static void main(String[] args) throws ClientException,
InterruptedException, IOException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+
+ // Credential provider is optional for client configuration.
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+
+ String endpoints = "foobar.com:8080";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ // On some Windows platforms, you may encounter SSL compatibility
issues. Try turning off the SSL option in
+ // client configuration to solve the problem please if SSL is not
essential.
+ // .enableSsl(false)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String consumerGroup = "yourConsumerGroup";
+ String topic = "yourParentTopic";
+ // In most case, you don't need to create too many consumers,
singleton pattern is recommended.
+ LitePushConsumer litePushConsumer =
provider.newLitePushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the consumer group name.
+ .setConsumerGroup(consumerGroup)
+ // Bind to the parent topic
+ .bindTopic(topic)
+ .setMessageListener(messageView -> {
+ // Handle the received message and return consume result.
+ log.info("Consume message={}", messageView);
+ return ConsumeResult.SUCCESS;
+ })
+ .build();
+
+ try {
+ /*
+ The subscribeLite() method initiates network requests and performs
quota verification, so it may fail.
+ It's important to check the result of this call to ensure that the
subscription was successfully added.
+ Possible failure scenarios include:
+ 1. Network request errors, which can be retried.
+ 2. Quota verification failures, indicated by
LiteSubscriptionQuotaExceededException. In this case,
+ evaluate whether the quota is insufficient and promptly
unsubscribe from unused subscriptions
+ using unsubscribeLite() to free up resources.
+ */
+ litePushConsumer.subscribeLite("lite-topic-1");
+ litePushConsumer.subscribeLite("lite-topic-2");
+ litePushConsumer.subscribeLite("lite-topic-3");
+ } catch (LiteSubscriptionQuotaExceededException e) {
+ // 1. Evaluate and increase the lite topic resource limit.
+ // 2. Unsubscribe unused lite topics in time
+ // litePushConsumer.unsubscribeLite("lite-topic-3");
+ log.error("Lite subscription quota exceeded", e);
+ } catch (Throwable t) {
+ // should retry later
+ log.error("Failed to subscribe lite topic", t);
+ }
+
+ // Block the main thread, no need for production environment.
+ Thread.sleep(Long.MAX_VALUE);
+ // Close the push consumer when you don't need it anymore.
+ // You could close it manually or add this into the JVM shutdown hook.
+ litePushConsumer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteSubscriptionQuotaExceededException.java
similarity index 57%
copy from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
copy to
java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteSubscriptionQuotaExceededException.java
index d514a8ac..eb17a279 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteSubscriptionQuotaExceededException.java
@@ -15,23 +15,12 @@
* limitations under the License.
*/
-package org.apache.rocketmq.client.java.impl;
+package org.apache.rocketmq.client.java.exception;
-public enum ClientType {
- PRODUCER,
- PUSH_CONSUMER,
- SIMPLE_CONSUMER;
+import org.apache.rocketmq.client.apis.ClientException;
- public apache.rocketmq.v2.ClientType toProtobuf() {
- if (PRODUCER.equals(this)) {
- return apache.rocketmq.v2.ClientType.PRODUCER;
- }
- if (PUSH_CONSUMER.equals(this)) {
- return apache.rocketmq.v2.ClientType.PUSH_CONSUMER;
- }
- if (SIMPLE_CONSUMER.equals(this)) {
- return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
- }
- return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
+public class LiteSubscriptionQuotaExceededException extends ClientException {
+ public LiteSubscriptionQuotaExceededException(int responseCode, String
requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteTopicQuotaExceededException.java
similarity index 57%
copy from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
copy to
java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteTopicQuotaExceededException.java
index d514a8ac..72494bf8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteTopicQuotaExceededException.java
@@ -15,23 +15,12 @@
* limitations under the License.
*/
-package org.apache.rocketmq.client.java.impl;
+package org.apache.rocketmq.client.java.exception;
-public enum ClientType {
- PRODUCER,
- PUSH_CONSUMER,
- SIMPLE_CONSUMER;
+import org.apache.rocketmq.client.apis.ClientException;
- public apache.rocketmq.v2.ClientType toProtobuf() {
- if (PRODUCER.equals(this)) {
- return apache.rocketmq.v2.ClientType.PRODUCER;
- }
- if (PUSH_CONSUMER.equals(this)) {
- return apache.rocketmq.v2.ClientType.PUSH_CONSUMER;
- }
- if (SIMPLE_CONSUMER.equals(this)) {
- return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
- }
- return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
+public class LiteTopicQuotaExceededException extends ClientException {
+ public LiteTopicQuotaExceededException(int responseCode, String requestId,
String message) {
+ super(responseCode, requestId, message);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
index 58e9cfc0..746e49da 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
@@ -48,6 +48,7 @@ public class StatusChecker {
case ILLEGAL_MESSAGE_TAG:
case ILLEGAL_MESSAGE_KEY:
case ILLEGAL_MESSAGE_GROUP:
+ case ILLEGAL_LITE_TOPIC:
case ILLEGAL_MESSAGE_PROPERTY_KEY:
case INVALID_TRANSACTION_ID:
case ILLEGAL_MESSAGE_ID:
@@ -83,6 +84,10 @@ public class StatusChecker {
throw new PayloadEmptyException(codeNumber, requestId,
statusMessage);
case TOO_MANY_REQUESTS:
throw new TooManyRequestsException(codeNumber, requestId,
statusMessage);
+ case LITE_TOPIC_QUOTA_EXCEEDED:
+ throw new LiteTopicQuotaExceededException(codeNumber,
requestId, statusMessage);
+ case LITE_SUBSCRIPTION_QUOTA_EXCEEDED:
+ throw new LiteSubscriptionQuotaExceededException(codeNumber,
requestId, statusMessage);
case REQUEST_HEADER_FIELDS_TOO_LARGE:
case MESSAGE_PROPERTIES_TOO_LARGE:
throw new RequestHeaderFieldsTooLargeException(codeNumber,
requestId, statusMessage);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 80e943fe..f17832bf 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -24,6 +24,7 @@ import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
@@ -466,6 +467,17 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
+ "command={}", clientId, command);
}
+ /**
+ * This method is invoked while request of unsubscribe lite topic is
received from remote.
+ * @param endpoints remote endpoints.
+ * @param command request of unsubscribe lite topic from remote.
+ */
+ @Override
+ public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints,
NotifyUnsubscribeLiteCommand command) {
+ log.warn("Ignore unsubscribe lite topic command from remote, which is
not expected, clientId={}, "
+ + "command={}", clientId, command);
+ }
+
private void updateRouteCache() {
log.info("Start to update route cache for a new round, clientId={}",
clientId);
topicRouteCache.keySet().forEach(topic -> {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index e9ff2076..d8f5c22f 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -39,6 +39,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.AbstractIdleService;
import io.grpc.stub.StreamObserver;
@@ -182,6 +184,20 @@ public abstract class ClientManager extends
AbstractIdleService {
public abstract RpcFuture<RecallMessageRequest, RecallMessageResponse>
recallMessage(Endpoints endpoints,
RecallMessageRequest request, Duration duration);
+ /**
+ * Sync lite subscription asynchronously, the method ensures no throwable.
+ *
+ * @param endpoints request endpoints.
+ * @param request request.
+ * @param duration request max duration.
+ * @return invocation of response future.
+ */
+ public abstract RpcFuture<SyncLiteSubscriptionRequest,
SyncLiteSubscriptionResponse> syncLiteSubscription(
+ Endpoints endpoints,
+ SyncLiteSubscriptionRequest request,
+ Duration duration
+ );
+
/**
* Establish telemetry session stream to server.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 76cfb617..d0fd49bb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -39,6 +39,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -361,6 +363,24 @@ public class ClientManagerImpl extends ClientManager {
}
}
+ @Override
+ public RpcFuture<SyncLiteSubscriptionRequest,
SyncLiteSubscriptionResponse> syncLiteSubscription(
+ Endpoints endpoints,
+ SyncLiteSubscriptionRequest request,
+ Duration duration
+ ) {
+ try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
+ final RpcClient rpcClient = getRpcClient(endpoints);
+ final ListenableFuture<SyncLiteSubscriptionResponse> future =
+ rpcClient.syncLiteSubscription(metadata, request, asyncWorker,
duration);
+ return new RpcFuture<>(context, request, future);
+ } catch (Throwable t) {
+ return new RpcFuture<>(t);
+ }
+ }
+
@Override
public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
Duration duration,
StreamObserver<TelemetryCommand> responseObserver) throws
ClientException {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
index f502b8f0..ee75bad7 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
@@ -18,10 +18,12 @@
package org.apache.rocketmq.client.java.impl;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import
org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
@@ -37,7 +39,7 @@ public class ClientServiceProviderImpl implements
ClientServiceProvider {
}
/**
- * @see ClientServiceProvider#newMessageBuilder()
+ * @see ClientServiceProvider#newPushConsumerBuilder()
*/
@Override
public PushConsumerBuilder newPushConsumerBuilder() {
@@ -45,7 +47,15 @@ public class ClientServiceProviderImpl implements
ClientServiceProvider {
}
/**
- * @see ClientServiceProvider#newMessageBuilder()
+ * @see ClientServiceProvider#newLitePushConsumerBuilder()
+ */
+ @Override
+ public LitePushConsumerBuilder newLitePushConsumerBuilder() {
+ return new LitePushConsumerBuilderImpl();
+ }
+
+ /**
+ * @see ClientServiceProvider#newSimpleConsumerBuilder()
*/
@Override
public SimpleConsumerBuilder newSimpleConsumerBuilder() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 29dd770c..f0c7de3e 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.java.impl;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.ReconnectEndpointsCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
@@ -46,7 +47,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
private final ClientSessionHandler sessionHandler;
private final Endpoints endpoints;
- private final SettableFuture<Settings> future;
+ private final SettableFuture<Settings> settingsInitFuture;
private volatile StreamObserver<TelemetryCommand> requestObserver;
@SuppressWarnings("UnstableApiUsage")
@@ -54,8 +55,8 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
throws ClientException {
this.sessionHandler = sessionHandler;
this.endpoints = endpoints;
- this.future = SettableFuture.create();
- Futures.withTimeout(future,
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
+ this.settingsInitFuture = SettableFuture.create();
+ Futures.withTimeout(settingsInitFuture,
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
TimeUnit.MILLISECONDS, sessionHandler.getScheduler());
this.requestObserver = sessionHandler.telemetry(endpoints, this);
}
@@ -85,7 +86,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
protected ListenableFuture<Settings> syncSettings() {
this.syncSettings0();
- return future;
+ return settingsInitFuture;
}
private void syncSettings0() {
@@ -129,7 +130,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
final Settings settings = command.getSettings();
log.info("Receive settings from remote, endpoints={},
clientId={}", endpoints, clientId);
sessionHandler.onSettingsCommand(endpoints, settings);
- if (future.set(settings)) {
+ if (settingsInitFuture.set(settings)) {
log.info("Init settings successfully, endpoints={},
clientId={}", endpoints, clientId);
}
break;
@@ -165,6 +166,14 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
sessionHandler.onReconnectEndpointsCommand(endpoints,
reconnectEndpointsCommand);
break;
}
+ case NOTIFY_UNSUBSCRIBE_LITE_COMMAND: {
+ final NotifyUnsubscribeLiteCommand
notifyUnsubscribeLiteCommand =
+ command.getNotifyUnsubscribeLiteCommand();
+ log.info("Receive notify unsubscribe lite command from
remote, endpoints={}, "
+ + "clientId={}", endpoints, clientId);
+ sessionHandler.onNotifyUnsubscribeLiteCommand(endpoints,
notifyUnsubscribeLiteCommand);
+ break;
+ }
default:
log.warn("Receive unrecognized command from remote,
endpoints={}, command={}, clientId={}",
endpoints, command, clientId);
@@ -182,6 +191,8 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
throwable);
release();
if (!sessionHandler.isRunning()) {
+ // first time to sync settings, forward the exception to upper
layer
+ settingsInitFuture.setException(throwable);
log.info("Session handler is not running, forgive to renew request
observer, clientId={}, "
+ "endpoints={}", clientId, endpoints);
return;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
index d514a8ac..4eba5299 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl;
public enum ClientType {
PRODUCER,
PUSH_CONSUMER,
+ LITE_PUSH_CONSUMER,
SIMPLE_CONSUMER;
public apache.rocketmq.v2.ClientType toProtobuf() {
@@ -29,6 +30,9 @@ public enum ClientType {
if (PUSH_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.PUSH_CONSUMER;
}
+ if (LITE_PUSH_CONSUMER.equals(this)) {
+ return apache.rocketmq.v2.ClientType.LITE_PUSH_CONSUMER;
+ }
if (SIMPLE_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index 88b335c8..cc21b8b2 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -55,6 +55,10 @@ public abstract class Settings {
return retryPolicy;
}
+ public ClientType getClientType() {
+ return clientType;
+ }
+
@ExcludeFromJacocoGeneratedReport
@Override
public String toString() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 795c2cac..b3aee647 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -24,6 +24,7 @@ import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
@@ -53,6 +54,7 @@ import
org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.impl.ClientManager;
+import org.apache.rocketmq.client.java.impl.ClientType;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -127,10 +129,13 @@ abstract class ConsumerImpl extends ClientImpl {
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic())
.build();
- final AckMessageEntry entry = AckMessageEntry.newBuilder()
+ final AckMessageEntry.Builder builder = AckMessageEntry.newBuilder()
.setMessageId(messageView.getMessageId().toString())
- .setReceiptHandle(messageView.getReceiptHandle())
- .build();
+ .setReceiptHandle(messageView.getReceiptHandle());
+ if (ClientType.LITE_PUSH_CONSUMER == getSettings().getClientType()) {
+ messageView.getLiteTopic().ifPresent(builder::setLiteTopic);
+ }
+ final AckMessageEntry entry = builder.build();
return
AckMessageRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.addEntries(entry).build();
}
@@ -267,4 +272,10 @@ abstract class ConsumerImpl extends ClientImpl {
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
}
+
+ @Override
+ public HeartbeatRequest wrapHeartbeatRequest() {
+ return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
+ .setClientType(getSettings().getClientType().toProtobuf()).build();
+ }
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java
new file mode 100644
index 00000000..0f11eddc
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.CONSUMER_GROUP_PATTERN;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+
+public class LitePushConsumerBuilderImpl implements LitePushConsumerBuilder {
+
+ protected String bindTopic = null;
+ // below is same as PushConsumerBuilderImpl
+ protected ClientConfiguration clientConfiguration = null;
+ protected String consumerGroup = null;
+ protected Map<String, FilterExpression> subscriptionExpressions = null;
+ protected MessageListener messageListener = null;
+ protected int maxCacheMessageCount = 1024;
+ protected int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
+ protected int consumptionThreadCount = 20;
+
+ @Override
+ public LitePushConsumerBuilder bindTopic(String bindTopic) {
+ checkArgument(StringUtils.isNotBlank(bindTopic), "bindTopic should not
be blank");
+ this.bindTopic = bindTopic;
+ return this;
+ }
+
+ @Override
+ public LitePushConsumerBuilder setClientConfiguration(ClientConfiguration
clientConfiguration) {
+ this.clientConfiguration = checkNotNull(clientConfiguration,
"clientConfiguration should not be null");
+ return this;
+ }
+
+ @Override
+ public LitePushConsumerBuilder setConsumerGroup(String consumerGroup) {
+ checkNotNull(consumerGroup, "consumerGroup should not be null");
+ checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(),
"consumerGroup does not match the "
+ + "regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
+ this.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ @Override
+ public LitePushConsumerBuilder setMessageListener(MessageListener
messageListener) {
+ this.messageListener = checkNotNull(messageListener, "messageListener
should not be null");
+ return this;
+ }
+
+ @Override
+ public LitePushConsumerBuilder setMaxCacheMessageCount(int
maxCachedMessageCount) {
+ checkArgument(maxCachedMessageCount > 0, "maxCachedMessageCount should
be positive");
+ this.maxCacheMessageCount = maxCachedMessageCount;
+ return this;
+ }
+
+ @Override
+ public LitePushConsumerBuilder setMaxCacheMessageSizeInBytes(int
maxCacheMessageSizeInBytes) {
+ checkArgument(maxCacheMessageSizeInBytes > 0,
"maxCacheMessageSizeInBytes should be positive");
+ this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+ return this;
+ }
+
+ @Override
+ public LitePushConsumerBuilder setConsumptionThreadCount(int
consumptionThreadCount) {
+ checkArgument(consumptionThreadCount > 0, "consumptionThreadCount
should be positive");
+ this.consumptionThreadCount = consumptionThreadCount;
+ return this;
+ }
+
+ @Override
+ public LitePushConsumer build() throws ClientException {
+ checkNotNull(clientConfiguration, "clientConfiguration has not been
set yet");
+ checkNotNull(consumerGroup, "consumerGroup has not been set yet");
+ checkNotNull(messageListener, "messageListener has not been set yet");
+ checkNotNull(bindTopic, "bindTopic has not been set yet");
+ // passing bindTopic through subscriptionExpressions to ClientImpl
+ subscriptionExpressions = ImmutableMap.of(bindTopic,
FilterExpression.SUB_ALL);
+ final LitePushConsumerImpl litePushConsumer = new
LitePushConsumerImpl(this);
+ litePushConsumer.startAsync().awaitRunning();
+ return litePushConsumer;
+ }
+
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
new file mode 100644
index 00000000..30429a8f
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.LiteSubscriptionAction;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
+import apache.rocketmq.v2.ReceiveMessageRequest;
+import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
+import org.apache.rocketmq.client.java.exception.StatusChecker;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LitePushConsumerImpl extends PushConsumerImpl implements
LitePushConsumer {
+ private static final Logger log =
LoggerFactory.getLogger(LitePushConsumerImpl.class);
+
+ private volatile ScheduledFuture<?> syncAllScheduledFuture;
+ private final LitePushConsumerSettings litePushConsumerSettings;
+
+ public LitePushConsumerImpl(LitePushConsumerBuilderImpl builder) {
+ super(builder.clientConfiguration, builder.consumerGroup,
builder.subscriptionExpressions,
+ builder.messageListener, builder.maxCacheMessageCount,
builder.maxCacheMessageSizeInBytes,
+ builder.consumptionThreadCount, false);
+ this.litePushConsumerSettings = new LitePushConsumerSettings(builder,
clientId, endpoints);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ super.startUp();
+ syncAllScheduledFuture = getScheduler().scheduleWithFixedDelay(() -> {
+ try {
+ syncAllLiteSubscription();
+ } catch (Throwable t) {
+ log.error("Schedule syncAllLiteSubscription error,
clientId={}", clientId, t);
+ }
+ }, 30, 30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void shutDown() throws InterruptedException {
+ super.shutDown();
+ if (null != syncAllScheduledFuture) {
+ syncAllScheduledFuture.cancel(false);
+ }
+ }
+
+ @Override
+ public void subscribeLite(String liteTopic) throws ClientException {
+ checkRunning();
+ if (litePushConsumerSettings.containsLiteTopic(liteTopic)) {
+ return;
+ }
+ validateLiteTopic(liteTopic);
+ checkLiteSubscriptionQuota(1);
+ ListenableFuture<Void> future =
+ syncLiteSubscription(LiteSubscriptionAction.PARTIAL_ADD,
Collections.singleton(liteTopic));
+ try {
+ handleClientFuture(future);
+ } catch (ClientException e) {
+ log.error("Failed to subscribeLite {}", liteTopic, e);
+ throw e;
+ }
+ litePushConsumerSettings.addLiteTopic(liteTopic);
+ log.info("SubscribeLite {}, topic={}, group={}, clientId={}",
+ liteTopic, litePushConsumerSettings.bindTopic.getName(),
getConsumerGroup(), clientId);
+ }
+
+ private void checkLiteSubscriptionQuota(int delta) throws
LiteSubscriptionQuotaExceededException {
+ int quota = litePushConsumerSettings.getLiteSubscriptionQuota();
+ if (litePushConsumerSettings.getLiteTopicSetSize() + delta > quota) {
+ throw new LiteSubscriptionQuotaExceededException(
+ Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED_VALUE, null, "Lite
subscription quota exceeded " + quota);
+ }
+ }
+
+ private void validateLiteTopic(String liteTopic) {
+ if (StringUtils.isBlank(liteTopic)) {
+ throw new IllegalArgumentException("liteTopic is blank");
+ }
+ if (liteTopic.length() >
litePushConsumerSettings.getMaxLiteTopicSize()) {
+ String errorMessage = String.format("liteTopic length exceeded max
length %d, liteTopic: %s",
+ litePushConsumerSettings.getMaxLiteTopicSize(), liteTopic);
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ @Override
+ public void unsubscribeLite(String liteTopic) throws ClientException {
+ checkRunning();
+ if (!litePushConsumerSettings.containsLiteTopic(liteTopic)) {
+ return;
+ }
+ ListenableFuture<Void> future =
+ syncLiteSubscription(LiteSubscriptionAction.PARTIAL_REMOVE,
Collections.singleton(liteTopic));
+ try {
+ handleClientFuture(future);
+ } catch (ClientException e) {
+ log.error("Failed to unsubscribeLite {}", liteTopic, e);
+ throw e;
+ }
+ litePushConsumerSettings.removeLiteTopic(liteTopic);
+ log.info("UnsubscribeLite {}, topic={}, group={}, clientId={}",
+ liteTopic, litePushConsumerSettings.bindTopic.getName(),
getConsumerGroup(), clientId);
+ }
+
+ @Override
+ public Set<String> getLiteTopicSet() {
+ return litePushConsumerSettings.getLiteTopicSet();
+ }
+
+ @VisibleForTesting
+ protected void syncAllLiteSubscription() throws ClientException {
+ checkLiteSubscriptionQuota(0);
+ final Set<String> set = litePushConsumerSettings.getLiteTopicSet();
+ ListenableFuture<Void> future =
syncLiteSubscription(LiteSubscriptionAction.COMPLETE_ADD, set);
+ handleClientFuture(future);
+ }
+
+ protected ListenableFuture<Void>
syncLiteSubscription(LiteSubscriptionAction action, Collection<String> diff) {
+ SyncLiteSubscriptionRequest request =
SyncLiteSubscriptionRequest.newBuilder()
+ .setAction(action)
+ .setTopic(litePushConsumerSettings.bindTopic.toProtobuf())
+ .setGroup(litePushConsumerSettings.group.toProtobuf())
+ .addAllLiteTopicSet(diff)
+ .build();
+ Endpoints endpoints = getEndpoints();
+ return syncLiteSubscription0(endpoints, request);
+ }
+
+ protected ListenableFuture<Void> syncLiteSubscription0(Endpoints
endpoints, SyncLiteSubscriptionRequest request) {
+ final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
+ RpcFuture<SyncLiteSubscriptionRequest, SyncLiteSubscriptionResponse>
future =
+ this.getClientManager().syncLiteSubscription(endpoints, request,
requestTimeout);
+
+ return Futures.transformAsync(future, response -> {
+ final Status status = response.getStatus();
+ StatusChecker.check(status, future);
+ return Futures.immediateVoidFuture();
+ }, MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints,
NotifyUnsubscribeLiteCommand command) {
+ String liteTopic = command.getLiteTopic();
+
+ log.info("notify unsubscribe lite liteTopic={} group={} bindTopic={}",
+ liteTopic, getConsumerGroup(), getSettings().bindTopic);
+
+ if (StringUtils.isBlank(liteTopic)) {
+ return;
+ }
+
+ litePushConsumerSettings.removeLiteTopic(liteTopic);
+ }
+
+ @Override
+ public LitePushConsumerSettings getSettings() {
+ return litePushConsumerSettings;
+ }
+
+ @Override
+ ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
+ FilterExpression filterExpression, Duration longPollingTimeout, String
attemptId) {
+ attemptId = null == attemptId ? UUID.randomUUID().toString() :
attemptId;
+ return ReceiveMessageRequest.newBuilder()
+ .setGroup(getProtobufGroup())
+ .setMessageQueue(mq.toProtobuf())
+
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
+ .setBatchSize(batchSize)
+ .setAttemptId(attemptId)
+ .setAutoRenew(true)
+ .build();
+ }
+
+ @VisibleForTesting
+ protected void checkRunning() {
+ if (!this.isRunning()) {
+ log.error("lite push consumer not running, state={}, clientId={}",
+ this.state(), clientId);
+ throw new IllegalStateException("lite push consumer not running");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerSettings.java
new file mode 100644
index 00000000..c76a6690
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerSettings.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.Subscription;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.message.protocol.Resource;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LitePushConsumerSettings extends PushSubscriptionSettings {
+ private static final Logger log =
LoggerFactory.getLogger(LitePushConsumerSettings.class);
+ // bindTopic for lite push consumer
+ final Resource bindTopic;
+ private final Set<String> liteTopicSet = ConcurrentHashMap.newKeySet();
+ /**
+ * client-side lite subscription quota limit
+ */
+ private int liteSubscriptionQuota;
+ private int maxLiteTopicSize = 64;
+
+ private final AtomicLong version = new
AtomicLong(System.currentTimeMillis());
+
+ public LitePushConsumerSettings(
+ LitePushConsumerBuilderImpl builder,
+ ClientId clientId,
+ Endpoints endpoints
+ ) {
+ super(builder.clientConfiguration, clientId,
ClientType.LITE_PUSH_CONSUMER, endpoints, builder.consumerGroup,
+ builder.subscriptionExpressions);
+ this.bindTopic = new Resource(namespace, builder.bindTopic);
+ }
+
+ public boolean containsLiteTopic(String liteTopic) {
+ return liteTopicSet.contains(liteTopic);
+ }
+
+ public void addLiteTopic(String liteTopic) {
+ if (!liteTopicSet.add(liteTopic)) {
+ return;
+ }
+ version.set(System.currentTimeMillis());
+ }
+
+ public void removeLiteTopic(String liteTopic) {
+ if (!liteTopicSet.remove(liteTopic)) {
+ return;
+ }
+ version.set(System.currentTimeMillis());
+ }
+
+ public Set<String> getLiteTopicSet() {
+ return ImmutableSet.copyOf(liteTopicSet);
+ }
+
+ public int getLiteSubscriptionQuota() {
+ return liteSubscriptionQuota;
+ }
+
+ public int getMaxLiteTopicSize() {
+ return maxLiteTopicSize;
+ }
+
+ public int getLiteTopicSetSize() {
+ return liteTopicSet.size();
+ }
+
+ public long getVersion() {
+ return version.get();
+ }
+
+ @Override
+ public void sync(apache.rocketmq.v2.Settings settings) {
+ super.sync(settings);
+ final Subscription subscription = settings.getSubscription();
+ if (subscription.hasLiteSubscriptionQuota()) {
+ this.liteSubscriptionQuota =
subscription.getLiteSubscriptionQuota();
+ }
+ if (subscription.hasMaxLiteTopicSize()) {
+ this.maxLiteTopicSize = subscription.getMaxLiteTopicSize();
+ }
+ }
+
+ @ExcludeFromJacocoGeneratedReport
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("clientId", clientId)
+ .add("clientType", clientType)
+ .add("accessPoint", accessPoint)
+ .add("retryPolicy", retryPolicy)
+ .add("requestTimeout", requestTimeout)
+ .add("group", group)
+ .add("receiveBatchSize", receiveBatchSize)
+ .add("longPollingTimeout", longPollingTimeout)
+ // for lite
+ .add("bindTopic", bindTopic)
+ .add("liteSubscriptionQuota", liteSubscriptionQuota)
+ .add("maxLiteTopicSize", maxLiteTopicSize)
+ .add("interestSet", liteTopicSet)
+ .add("version", version)
+ .toString();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 6e9146db..c42acda6 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -134,7 +134,7 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public boolean expired() {
- final Duration longPollingTimeout =
consumer.getPushConsumerSettings().getLongPollingTimeout();
+ final Duration longPollingTimeout =
consumer.getSettings().getLongPollingTimeout();
final Duration requestTimeout =
consumer.getClientConfiguration().getRequestTimeout();
final Duration maxIdleDuration =
longPollingTimeout.plus(requestTimeout).multipliedBy(3);
final Duration idleDuration = Duration.ofNanos(System.nanoTime() -
activityNanoTime);
@@ -165,7 +165,7 @@ class ProcessQueueImpl implements ProcessQueue {
private int getReceptionBatchSize() {
int bufferSize = consumer.cacheMessageCountThresholdPerQueue() -
this.cachedMessagesCount();
bufferSize = Math.max(bufferSize, 1);
- return Math.min(bufferSize,
consumer.getPushConsumerSettings().getReceiveBatchSize());
+ return Math.min(bufferSize,
consumer.getSettings().getReceiveBatchSize());
}
@Override
@@ -235,7 +235,7 @@ class ProcessQueueImpl implements ProcessQueue {
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
- final Duration longPollingTimeout =
consumer.getPushConsumerSettings().getLongPollingTimeout();
+ final Duration longPollingTimeout =
consumer.getSettings().getLongPollingTimeout();
final ReceiveMessageRequest request =
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
longPollingTimeout, attemptId);
activityNanoTime = System.nanoTime();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index c43c3d27..9319b235 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -17,11 +17,9 @@
package org.apache.rocketmq.client.java.impl.consumer;
-import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
-import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.Status;
@@ -63,11 +61,10 @@ import
org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
-import org.apache.rocketmq.client.java.impl.Settings;
+import org.apache.rocketmq.client.java.impl.ClientType;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
-import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.metrics.GaugeObserver;
import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
@@ -95,7 +92,6 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
final AtomicLong consumptionOkQuantity;
final AtomicLong consumptionErrorQuantity;
- private final ClientConfiguration clientConfiguration;
private final PushSubscriptionSettings pushSubscriptionSettings;
private final String consumerGroup;
private final Map<String /* topic */, FilterExpression>
subscriptionExpressions;
@@ -139,10 +135,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int
consumptionThreadCount,
boolean enableFifoConsumeAccelerator, boolean
enableMessageInterceptorFiltering) {
super(clientConfiguration, consumerGroup,
subscriptionExpressions.keySet());
- this.clientConfiguration = clientConfiguration;
- Resource groupResource = new
Resource(clientConfiguration.getNamespace(), consumerGroup);
- this.pushSubscriptionSettings = new
PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
- endpoints, groupResource, clientConfiguration.getRequestTimeout(),
subscriptionExpressions);
+ this.pushSubscriptionSettings = new
PushSubscriptionSettings(clientConfiguration, clientId,
+ ClientType.PUSH_CONSUMER, endpoints, consumerGroup,
subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
@@ -181,7 +175,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
@Override
protected void startUp() throws Exception {
try {
- log.info("Begin to start the rocketmq push consumer, clientId={}",
clientId);
+ log.info("Begin to start the rocketmq {}, clientId={}",
getSettings().getClientType(), clientId);
GaugeObserver gaugeObserver = new
ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
this.clientMeterManager.setGaugeObserver(gaugeObserver);
super.startUp();
@@ -195,9 +189,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
log.error("Exception raised while scanning the load
assignments, clientId={}", clientId, t);
}
}, 1, 5, TimeUnit.SECONDS);
- log.info("The rocketmq push consumer starts successfully,
clientId={}", clientId);
+ log.info("The rocketmq {} starts successfully, clientId={}",
getSettings().getClientType(), clientId);
} catch (Throwable t) {
- log.error("Exception raised while starting the rocketmq push
consumer, clientId={}", clientId, t);
+ log.error("Exception raised while starting the rocketmq {},
clientId={}",
+ getSettings().getClientType(), clientId, t);
shutDown();
throw t;
}
@@ -214,7 +209,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
*/
@Override
protected void shutDown() throws InterruptedException {
- log.info("Begin to shutdown the rocketmq push consumer, clientId={}",
clientId);
+ log.info("Begin to shutdown the rocketmq {}, clientId={}",
getSettings().getClientType(), clientId);
if (null != scanAssignmentsFuture) {
scanAssignmentsFuture.cancel(false);
}
@@ -225,12 +220,12 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
ExecutorServices.awaitTerminated(consumptionExecutor);
TimeUnit.SECONDS.sleep(1);
super.shutDown();
- log.info("Shutdown the rocketmq push consumer successfully,
clientId={}", clientId);
+ log.info("Shutdown the rocketmq {} successfully, clientId={}",
getSettings().getClientType(), clientId);
}
private void waitingReceiveRequestFinished() {
Duration maxWaitingTime = clientConfiguration.getRequestTimeout()
- .plus(pushSubscriptionSettings.getLongPollingTimeout());
+ .plus(getSettings().getLongPollingTimeout());
long endTime = System.currentTimeMillis() + maxWaitingTime.toMillis();
try {
while (true) {
@@ -251,9 +246,9 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
}
}
- private ConsumeService createConsumeService() {
+ protected ConsumeService createConsumeService() {
final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
- if (pushSubscriptionSettings.isFifo()) {
+ if (getSettings().isFifo()) {
log.info("Create FIFO consume service, consumerGroup={},
clientId={}, enableFifoConsumeAccelerator={}",
consumerGroup, clientId, enableFifoConsumeAccelerator);
return new FifoConsumeService(clientId, messageListener,
consumptionExecutor, this,
@@ -271,10 +266,6 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
return consumerGroup;
}
- public PushSubscriptionSettings getPushConsumerSettings() {
- return pushSubscriptionSettings;
- }
-
/**
* @see PushConsumer#getSubscriptionExpressions()
*/
@@ -386,13 +377,6 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
return Optional.of(processQueue);
}
- @Override
- public HeartbeatRequest wrapHeartbeatRequest() {
- return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
- .setClientType(ClientType.PUSH_CONSUMER).build();
- }
-
-
@VisibleForTesting
void syncProcessQueue(String topic, Assignments assignments,
FilterExpression filterExpression) {
Set<MessageQueueImpl> latest = new HashSet<>();
@@ -486,7 +470,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
}
@Override
- public Settings getSettings() {
+ public PushSubscriptionSettings getSettings() {
return pushSubscriptionSettings;
}
@@ -573,11 +557,18 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic())
.build();
- return
ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
+
+ ForwardMessageToDeadLetterQueueRequest.Builder builder =
ForwardMessageToDeadLetterQueueRequest.newBuilder()
+ .setGroup(getProtobufGroup())
+ .setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setMessageId(messageView.getMessageId().toString())
.setDeliveryAttempt(messageView.getDeliveryAttempt())
- .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
+ .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts());
+ if (ClientType.LITE_PUSH_CONSUMER == getSettings().getClientType()) {
+ messageView.getLiteTopic().ifPresent(builder::setLiteTopic);
+ }
+ return builder.build();
}
public RpcFuture<ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse>
@@ -630,7 +621,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
}
public RetryPolicy getRetryPolicy() {
- return pushSubscriptionSettings.getRetryPolicy();
+ return getSettings().getRetryPolicy();
}
public ThreadPoolExecutor getConsumptionExecutor() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 26a66a18..624490d4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -27,6 +27,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.java.impl.ClientType;
@@ -44,16 +45,22 @@ import org.slf4j.LoggerFactory;
public class PushSubscriptionSettings extends Settings {
private static final Logger log =
LoggerFactory.getLogger(PushSubscriptionSettings.class);
- private final Resource group;
- private final Map<String, FilterExpression> subscriptionExpressions;
- private volatile Boolean fifo = false;
- private volatile int receiveBatchSize = 32;
- private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
+ protected final Resource group;
+ protected final Map<String, FilterExpression> subscriptionExpressions;
+ protected volatile Boolean fifo = false;
+ protected volatile int receiveBatchSize = 32;
+ protected volatile Duration longPollingTimeout = Duration.ofSeconds(30);
- public PushSubscriptionSettings(String namespace, ClientId clientId,
Endpoints endpoints, Resource group,
- Duration requestTimeout, Map<String, FilterExpression>
subscriptionExpression) {
- super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints,
requestTimeout);
- this.group = group;
+ public PushSubscriptionSettings(
+ ClientConfiguration configuration,
+ ClientId clientId,
+ ClientType clientType,
+ Endpoints endpoints,
+ String group,
+ Map<String, FilterExpression> subscriptionExpression
+ ) {
+ super(configuration.getNamespace(), clientId, clientType, endpoints,
configuration.getRequestTimeout());
+ this.group = new Resource(configuration.getNamespace(), group);
this.subscriptionExpressions = subscriptionExpression;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index e73774e5..0ffa7928 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -21,8 +21,6 @@ import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
-import apache.rocketmq.v2.ClientType;
-import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Status;
import com.google.common.math.IntMath;
@@ -154,12 +152,6 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
return new HashMap<>(subscriptionExpressions);
}
- @Override
- public HeartbeatRequest wrapHeartbeatRequest() {
- return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
- .setClientType(ClientType.SIMPLE_CONSUMER).build();
- }
-
/**
* @see SimpleConsumer#receive(int, Duration)
*/
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index b6f9c559..924e903d 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.java.impl.producer;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.ReconnectEndpointsCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
@@ -103,4 +104,9 @@ public interface ClientSessionHandler {
* Event processor for {@link ReconnectEndpointsCommand}.
*/
void onReconnectEndpointsCommand(Endpoints endpoints,
ReconnectEndpointsCommand command);
+
+ /**
+ * Event processor for {@link NotifyUnsubscribeLiteCommand}.
+ */
+ void onNotifyUnsubscribeLiteCommand(Endpoints endpoints,
NotifyUnsubscribeLiteCommand command);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
index 2e614fd5..827d8654 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
@@ -81,6 +81,13 @@ public interface GeneralMessage {
*/
Optional<String> getMessageGroup();
+ /**
+ * Get the lite topic, which makes sense only when the topic type is LITE.
+ *
+ * @return lite topic, which is optional, {@link Optional#empty()} means
lite topic is not specified.
+ */
+ Optional<String> getLiteTopic();
+
/**
* Get the expected delivery timestamp, which makes sense only when topic
type is delay.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
index ed46be42..ffc390e5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
@@ -35,6 +35,7 @@ public class GeneralMessageImpl implements GeneralMessage {
private final String tag;
private final Collection<String> keys;
private final String messageGroup;
+ private final String liteTopic;
private final Long deliveryTimestamp;
private final String bornHost;
private final Long bornTimestamp;
@@ -58,6 +59,7 @@ public class GeneralMessageImpl implements GeneralMessage {
this.tag = message.getTag().orElse(null);
this.keys = message.getKeys();
this.messageGroup = message.getMessageGroup().orElse(null);
+ this.liteTopic = message.getLiteTopic().orElse(null);
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
this.bornHost = null;
this.bornTimestamp = null;
@@ -92,6 +94,7 @@ public class GeneralMessageImpl implements GeneralMessage {
this.tag = message.getTag().orElse(null);
this.keys = message.getKeys();
this.messageGroup = message.getMessageGroup().orElse(null);
+ this.liteTopic = message.getLiteTopic().orElse(null);
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
this.bornHost = message.getBornHost();
this.bornTimestamp = message.getBornTimestamp();
@@ -136,6 +139,11 @@ public class GeneralMessageImpl implements GeneralMessage {
return Optional.ofNullable(messageGroup);
}
+ @Override
+ public Optional<String> getLiteTopic() {
+ return Optional.ofNullable(liteTopic);
+ }
+
@Override
public Optional<Long> getDeliveryTimestamp() {
return Optional.ofNullable(deliveryTimestamp);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index f5a066b2..21fcc132 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -35,13 +35,14 @@ import
org.apache.rocketmq.client.apis.message.MessageBuilder;
public class MessageBuilderImpl implements MessageBuilder {
public static final Pattern TOPIC_PATTERN =
Pattern.compile("^[%a-zA-Z0-9_-]+$");
- private String topic = null;
- private byte[] body = null;
- private String tag = null;
- private String messageGroup = null;
- private Long deliveryTimestamp = null;
- private Collection<String> keys = new HashSet<>();
- private final Map<String, String> properties = new HashMap<>();
+ protected String topic = null;
+ protected byte[] body = null;
+ protected String tag = null;
+ protected String messageGroup = null;
+ protected String liteTopic = null;
+ protected Long deliveryTimestamp = null;
+ protected Collection<String> keys = new HashSet<>();
+ protected final Map<String, String> properties = new HashMap<>();
public MessageBuilderImpl() {
}
@@ -98,17 +99,28 @@ public class MessageBuilderImpl implements MessageBuilder {
@Override
public MessageBuilder setMessageGroup(String messageGroup) {
checkArgument(null == deliveryTimestamp, "messageGroup and
deliveryTimestamp should not be set at same time");
+ checkArgument(null == liteTopic, "messageGroup and liteTopic should
not be set at same time");
checkArgument(StringUtils.isNotBlank(messageGroup), "messageGroup
should not be blank");
this.messageGroup = messageGroup;
return this;
}
+ @Override
+ public MessageBuilder setLiteTopic(String liteTopic) {
+ checkArgument(null == deliveryTimestamp, "liteTopic and
deliveryTimestamp should not be set at same time");
+ checkArgument(null == messageGroup, "liteTopic and messageGroup should
not be set at same time");
+ checkArgument(StringUtils.isNotBlank(liteTopic), "liteTopic should not
be blank");
+ this.liteTopic = liteTopic;
+ return this;
+ }
+
/**
* See {@link MessageBuilder#setDeliveryTimestamp(long)}
*/
@Override
public MessageBuilder setDeliveryTimestamp(long deliveryTimestamp) {
checkArgument(null == messageGroup, "deliveryTimestamp and
messageGroup should not be set at same time");
+ checkArgument(null == liteTopic, "deliveryTimestamp and liteTopic
should not be set at same time");
this.deliveryTimestamp = deliveryTimestamp;
return this;
}
@@ -131,6 +143,6 @@ public class MessageBuilderImpl implements MessageBuilder {
public Message build() {
checkNotNull(topic, "topic has not been set yet");
checkNotNull(body, "body has not been set yet");
- return new MessageImpl(topic, body, tag, keys, messageGroup,
deliveryTimestamp, properties);
+ return new MessageImpl(this);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 581880d7..adbec6c6 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -46,22 +46,23 @@ public class MessageImpl implements Message {
@Nullable
private final String messageGroup;
@Nullable
+ private final String liteTopic;
+ @Nullable
private final Long deliveryTimestamp;
/**
* The caller is supposed to have validated the arguments and handled
throwing exception or
* logging warnings already, so we avoid repeating args check here.
*/
- MessageImpl(String topic, byte[] body, @Nullable String tag,
Collection<String> keys,
- @Nullable String messageGroup, @Nullable Long deliveryTimestamp,
- Map<String, String> properties) {
- this.topic = topic;
- this.body = body;
- this.tag = tag;
- this.messageGroup = messageGroup;
- this.deliveryTimestamp = deliveryTimestamp;
- this.keys = keys;
- this.properties = properties;
+ MessageImpl(MessageBuilderImpl builder) {
+ this.topic = builder.topic;
+ this.body = builder.body;
+ this.tag = builder.tag;
+ this.messageGroup = builder.messageGroup;
+ this.liteTopic = builder.liteTopic;
+ this.deliveryTimestamp = builder.deliveryTimestamp;
+ this.keys = builder.keys;
+ this.properties = builder.properties;
}
MessageImpl(Message message) {
@@ -79,6 +80,7 @@ public class MessageImpl implements Message {
this.tag = message.getTag().orElse(null);
this.messageGroup = message.getMessageGroup().orElse(null);
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
+ this.liteTopic = message.getLiteTopic().orElse(null);
this.keys = message.getKeys();
this.properties = message.getProperties();
}
@@ -139,6 +141,11 @@ public class MessageImpl implements Message {
return Optional.ofNullable(messageGroup);
}
+ @Override
+ public Optional<String> getLiteTopic() {
+ return Optional.ofNullable(liteTopic);
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
index ab38c87e..5748149e 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.message;
public enum MessageType {
NORMAL,
FIFO,
+ LITE,
DELAY,
TRANSACTION;
@@ -29,6 +30,8 @@ public enum MessageType {
return MessageType.NORMAL;
case FIFO:
return MessageType.FIFO;
+ case LITE:
+ return MessageType.LITE;
case DELAY:
return MessageType.DELAY;
case TRANSACTION:
@@ -45,6 +48,8 @@ public enum MessageType {
return apache.rocketmq.v2.MessageType.NORMAL;
case FIFO:
return apache.rocketmq.v2.MessageType.FIFO;
+ case LITE:
+ return apache.rocketmq.v2.MessageType.LITE;
case DELAY:
return apache.rocketmq.v2.MessageType.DELAY;
case TRANSACTION:
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index aa9db400..420e3d35 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -51,6 +51,7 @@ public class MessageViewImpl implements MessageView {
private final String topic;
private final String tag;
private final String messageGroup;
+ private final String liteTopic;
private final Long deliveryTimestamp;
private final Collection<String> keys;
private final Map<String, String> properties;
@@ -65,7 +66,8 @@ public class MessageViewImpl implements MessageView {
private final long decodeTimestamp;
private final Long transportDeliveryTimestamp;
- public MessageViewImpl(MessageId messageId, String topic, byte[] body,
String tag, String messageGroup,
+ public MessageViewImpl(MessageId messageId, String topic, byte[] body,
String tag,
+ String messageGroup, String liteTopic,
Long deliveryTimestamp, Collection<String> keys, Map<String, String>
properties,
String bornHost, long bornTimestamp, int deliveryAttempt,
MessageQueueImpl messageQueue,
String receiptHandle, long offset, boolean corrupted,
@@ -75,6 +77,7 @@ public class MessageViewImpl implements MessageView {
this.body = checkNotNull(body, "body should not be null");
this.tag = tag;
this.messageGroup = messageGroup;
+ this.liteTopic = liteTopic;
this.deliveryTimestamp = deliveryTimestamp;
this.keys = checkNotNull(keys, "keys should not be null");
this.properties = checkNotNull(properties, "properties should not be
null");
@@ -146,6 +149,14 @@ public class MessageViewImpl implements MessageView {
return Optional.ofNullable(messageGroup);
}
+ /**
+ * @see MessageView#getLiteTopic()
+ */
+ @Override
+ public Optional<String> getLiteTopic() {
+ return Optional.ofNullable(liteTopic);
+ }
+
/**
* @see MessageView#getDeliveryTimestamp()
*/
@@ -289,6 +300,7 @@ public class MessageViewImpl implements MessageView {
String tag = systemProperties.hasTag() ? systemProperties.getTag() :
null;
String messageGroup = systemProperties.hasMessageGroup() ?
systemProperties.getMessageGroup() : null;
+ String liteTopic = systemProperties.hasLiteTopic() ?
systemProperties.getLiteTopic() : null;
Long deliveryTimestamp = systemProperties.hasDeliveryTimestamp() ?
Timestamps.toMillis(systemProperties.getDeliveryTimestamp()) :
null;
final ProtocolStringList keys = systemProperties.getKeysList();
@@ -298,8 +310,9 @@ public class MessageViewImpl implements MessageView {
final long offset = systemProperties.getQueueOffset();
final Map<String, String> properties = message.getUserPropertiesMap();
final String receiptHandle = systemProperties.getReceiptHandle();
- return new MessageViewImpl(messageId, topic, body, tag, messageGroup,
deliveryTimestamp, keys, properties,
- bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle,
offset, corrupted, transportDeliveryTimestamp);
+ return new MessageViewImpl(messageId, topic, body, tag, messageGroup,
liteTopic, deliveryTimestamp,
+ keys, properties, bornHost, bornTimestamp, deliveryAttempt,
+ mq, receiptHandle, offset, corrupted, transportDeliveryTimestamp);
}
@Override
@@ -314,6 +327,7 @@ public class MessageViewImpl implements MessageView {
.add("tag", tag)
.add("keys", keys)
.add("messageGroup", messageGroup)
+ .add("liteTopic", liteTopic)
.add("deliveryTimestamp", deliveryTimestamp)
.add("properties", properties)
.toString();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 6af6d737..96987a78 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -49,6 +49,7 @@ public class PublishingMessageImpl extends MessageImpl {
this.messageId = MessageIdCodec.getInstance().nextMessageId();
// Normal message.
if (!message.getMessageGroup().isPresent() &&
+ !message.getLiteTopic().isPresent() &&
!message.getDeliveryTimestamp().isPresent() && !txEnabled) {
messageType = MessageType.NORMAL;
return;
@@ -58,6 +59,11 @@ public class PublishingMessageImpl extends MessageImpl {
messageType = MessageType.FIFO;
return;
}
+ // Lite message.
+ if (message.getLiteTopic().isPresent() && !txEnabled) {
+ messageType = MessageType.LITE;
+ return;
+ }
// Delay message.
if (message.getDeliveryTimestamp().isPresent() && !txEnabled) {
messageType = MessageType.DELAY;
@@ -111,6 +117,7 @@ public class PublishingMessageImpl extends MessageImpl {
.ifPresent(millis ->
systemPropertiesBuilder.setDeliveryTimestamp(Timestamps.fromMillis(millis)));
// Message group
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
+ this.getLiteTopic().ifPresent(systemPropertiesBuilder::setLiteTopic);
final SystemProperties systemProperties =
systemPropertiesBuilder.build();
Resource topicResource =
Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 4b734615..0f1ecb3e 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -49,6 +49,11 @@ public class Endpoints {
private final String facade;
private final List<Address> addresses;
+ /**
+ * Cache the hash code for the object
+ */
+ private int hash; // Default to 0
+
public Endpoints(apache.rocketmq.v2.Endpoints endpoints) {
this.addresses = new ArrayList<>();
for (apache.rocketmq.v2.Address address :
endpoints.getAddressesList()) {
@@ -202,6 +207,9 @@ public class Endpoints {
@Override
public int hashCode() {
- return Objects.hashCode(scheme, facade, addresses);
+ if (hash == 0) {
+ hash = Objects.hashCode(scheme, facade, addresses);
+ }
+ return hash;
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index c555082f..98f252a5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -39,6 +39,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Metadata;
@@ -200,6 +202,18 @@ public interface RpcClient {
ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata,
RecallMessageRequest request, Executor executor, Duration duration);
+ /**
+ * Sync lite subscription asynchronously.
+ *
+ * @param metadata gRPC request header metadata.
+ * @param request sync lite subscription request
+ * @param executor gRPC asynchronous executor.
+ * @param duration request max duration.
+ * @return invocation of response future.
+ */
+ ListenableFuture<SyncLiteSubscriptionResponse>
syncLiteSubscription(Metadata metadata,
+ SyncLiteSubscriptionRequest request, Executor executor, Duration
duration);
+
/**
* Start a streaming request and get the request observer.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index f71dfb29..1e0225a1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -40,6 +40,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -228,6 +230,16 @@ public class RpcClientImpl implements RpcClient {
.withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).recallMessage(request);
}
+ @Override
+ public ListenableFuture<SyncLiteSubscriptionResponse>
syncLiteSubscription(Metadata metadata,
+ SyncLiteSubscriptionRequest request, Executor executor, Duration
duration) {
+ this.activityNanoTime = System.nanoTime();
+ return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
+ .withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS)
+ .syncLiteSubscription(request);
+ }
+
@Override
public StreamObserver<TelemetryCommand> telemetry(Metadata metadata,
Executor executor, Duration duration,
StreamObserver<TelemetryCommand> responseObserver) {
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
index ca198d9d..cf3669ea 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java;
import static org.junit.Assert.assertEquals;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import
org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
@@ -39,6 +40,12 @@ public class ClientServiceProviderImplTest {
ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
}
+ @Test
+ public void testNewLitePushConsumerBuilder() {
+ assertEquals(LitePushConsumerBuilderImpl.class,
+
ClientServiceProvider.loadService().newLitePushConsumerBuilder().getClass());
+ }
+
@Test
public void testNewSimpleConsumerBuilder() {
assertEquals(SimpleConsumerBuilderImpl.class,
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
index 13ec8e37..56605011 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
@@ -110,6 +110,17 @@ public class StatusCheckerTest extends TestBase {
}
}
+ {
+ Status status =
Status.newBuilder().setCode(Code.ILLEGAL_LITE_TOPIC).build();
+ RpcFuture<Object, Object> invocation = new RpcFuture<>(context,
null, Futures.immediateFuture(response));
+ try {
+ StatusChecker.check(status, invocation);
+ fail();
+ } catch (BadRequestException ignore) {
+ // ignore on purpose
+ }
+ }
+
{
Status status =
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_TAG).build();
RpcFuture<Object, Object> invocation = new RpcFuture<>(context,
null, Futures.immediateFuture(response));
@@ -431,6 +442,22 @@ public class StatusCheckerTest extends TestBase {
}
}
+ @Test(expected = LiteTopicQuotaExceededException.class)
+ public void testLiteTopicQuotaExceeded() throws ClientException {
+ Status status =
Status.newBuilder().setCode(Code.LITE_TOPIC_QUOTA_EXCEEDED).build();
+ final Context context = generateContext();
+ RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null,
null);
+ StatusChecker.check(status, invocation);
+ }
+
+ @Test(expected = LiteSubscriptionQuotaExceededException.class)
+ public void testLiteSubscriptionQuotaExceeded() throws ClientException {
+ Status status =
Status.newBuilder().setCode(Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED).build();
+ final Context context = generateContext();
+ RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null,
null);
+ StatusChecker.check(status, invocation);
+ }
+
@Test
public void testRequestHeaderFieldsTooLarge() throws ClientException {
Object response = new Object();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index b6ac2ec3..a36c60c7 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -28,6 +28,7 @@ import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.SendMessageRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
import io.grpc.Metadata;
import java.time.Duration;
import org.apache.rocketmq.client.java.misc.ClientId;
@@ -144,4 +145,13 @@ public class ClientManagerImplTest extends TestBase {
CLIENT_MANAGER.recallMessage(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
+
+ @Test
+ public void testSyncLiteSubscription() {
+ SyncLiteSubscriptionRequest request =
SyncLiteSubscriptionRequest.newBuilder().build();
+ CLIENT_MANAGER.syncLiteSubscription(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.syncLiteSubscription(null, request,
Duration.ofSeconds(1));
+ // Expect no exception thrown.
+ }
+
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
index 9f7a58af..0b2e4bf3 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Settings;
@@ -241,4 +242,27 @@ public class ClientSessionImplTest extends TestBase {
Mockito.verify(requestObserver, times(1)).onCompleted();
Mockito.verify(sessionHandler, times(1)).getScheduler();
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnNextWithNotifyUnsubscribeLiteCommand() throws
ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new
ScheduledThreadPoolExecutor(1));
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
+ Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
+
Mockito.doNothing().when(sessionHandler).onNotifyUnsubscribeLiteCommand(any(Endpoints.class),
+ any(NotifyUnsubscribeLiteCommand.class));
+ NotifyUnsubscribeLiteCommand command0 =
NotifyUnsubscribeLiteCommand.newBuilder()
+ .setLiteTopic("test-lite-topic")
+ .build();
+ TelemetryCommand command = TelemetryCommand.newBuilder()
+ .setNotifyUnsubscribeLiteCommand(command0).build();
+ clientSession.onNext(command);
+ Mockito.verify(sessionHandler,
times(1)).onNotifyUnsubscribeLiteCommand(eq(endpoints), eq(command0));
+ }
+
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImplTest.java
new file mode 100644
index 00000000..fdb338cb
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImplTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+
+public class LitePushConsumerBuilderImplTest extends TestBase {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBindTopicWithNull() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.bindTopic(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBindTopicWithBlank() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.bindTopic(" ");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBindTopicWithEmpty() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.bindTopic("");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSetClientConfigurationWithNull() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setClientConfiguration(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSetConsumerGroupWithNull() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setConsumerGroup(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSetMessageListenerWithNull() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setMessageListener(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetNegativeMaxCacheMessageCount() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setMaxCacheMessageCount(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetNegativeMaxCacheMessageSizeInBytes() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setMaxCacheMessageSizeInBytes(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetNegativeConsumptionThreadCount() {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setConsumptionThreadCount(-1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testBuildWithoutClientConfiguration() throws ClientException {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ builder.setConsumerGroup(FAKE_CONSUMER_GROUP_0)
+ .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+ .bindTopic("test-topic")
+ .build();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testBuildWithoutConsumerGroup() throws ClientException {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ ClientConfiguration clientConfiguration =
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ builder.setClientConfiguration(clientConfiguration)
+ .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+ .bindTopic("test-topic")
+ .build();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testBuildWithoutMessageListener() throws ClientException {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ ClientConfiguration clientConfiguration =
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ builder.setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(FAKE_CONSUMER_GROUP_0)
+ .bindTopic("test-topic")
+ .build();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testBuildWithoutBindTopic() throws ClientException {
+ final LitePushConsumerBuilderImpl builder = new
LitePushConsumerBuilderImpl();
+ ClientConfiguration clientConfiguration =
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ builder.setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(FAKE_CONSUMER_GROUP_0)
+ .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+ .build();
+ }
+
+}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
new file mode 100644
index 00000000..b71ab166
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import apache.rocketmq.v2.LiteSubscriptionAction;
+import com.google.common.util.concurrent.Futures;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class LitePushConsumerImplTest {
+
+ final String endpoints = "127.0.0.1:8080";
+
+ LitePushConsumerSettings spySettings;
+
+ private LitePushConsumerImpl consumer;
+
+ @Before
+ public void setUp() {
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(endpoints).build();
+
+ LitePushConsumerBuilderImpl litePushConsumerBuilder = new
LitePushConsumerBuilderImpl();
+ litePushConsumerBuilder.setClientConfiguration(clientConfiguration);
+
+ LitePushConsumerSettings realSettings = new
LitePushConsumerSettings(litePushConsumerBuilder, new ClientId(),
+ new Endpoints("127.0.0.1:8080"));
+
+ spySettings = Mockito.spy(realSettings);
+
+ MockitoAnnotations.openMocks(this);
+ consumer = mock(LitePushConsumerImpl.class, CALLS_REAL_METHODS);
+ // Set final field litePushConsumerSettings using reflection
+ try {
+ java.lang.reflect.Field field =
LitePushConsumerImpl.class.getDeclaredField("litePushConsumerSettings");
+ field.setAccessible(true);
+ field.set(consumer, spySettings);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSubscribeLiteNotRunning() throws ClientException {
+ String liteTopic = "testLiteTopic";
+ doThrow(new IllegalStateException("not
running")).when(consumer).checkRunning();
+
+ consumer.subscribeLite(liteTopic);
+ }
+
+ @Test
+ public void testSubscribeLiteAlreadySubscribed() throws ClientException {
+ String liteTopic = "testLiteTopic";
+ doNothing().when(consumer).checkRunning();
+ when(spySettings.containsLiteTopic(liteTopic)).thenReturn(true);
+
+ consumer.subscribeLite(liteTopic);
+
+ verify(consumer).checkRunning();
+ verify(spySettings).containsLiteTopic(liteTopic);
+ verify(consumer, never()).syncLiteSubscription(any(), any());
+ }
+
+ @Test
+ public void
testSubscribeLiteQuotaExceededThenUnsubscribeAndSubscribeAgain() throws
ClientException {
+ String liteTopic1 = "testLiteTopic1";
+ String liteTopic2 = "testLiteTopic2";
+ doNothing().when(consumer).checkRunning();
+ doReturn(Futures.immediateVoidFuture()).when(consumer)
+ .syncLiteSubscription(any(LiteSubscriptionAction.class),
anyCollection());
+ when(spySettings.getLiteSubscriptionQuota()).thenReturn(1);
+
+ consumer.subscribeLite(liteTopic1);
+ assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(1);
+
+ assertThatThrownBy(() -> consumer.subscribeLite(liteTopic2))
+ .isInstanceOf(LiteSubscriptionQuotaExceededException.class);
+ assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(1);
+
+ consumer.unsubscribeLite(liteTopic1);
+ assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(0);
+
+ consumer.subscribeLite(liteTopic2);
+ assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(1);
+
+ verify(spySettings, times(1)).addLiteTopic(liteTopic1);
+ verify(spySettings, times(1)).removeLiteTopic(liteTopic1);
+ verify(spySettings, times(1)).addLiteTopic(liteTopic2);
+ }
+}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index d9aa61fb..e3ecfe46 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -92,7 +92,7 @@ public class ProcessQueueImplTest extends TestBase {
field1.setAccessible(true);
field1.set(pushConsumer, consumptionErrorQuantity);
-
when(pushConsumer.getPushConsumerSettings()).thenReturn(pushSubscriptionSettings);
+ when(pushConsumer.getSettings()).thenReturn(pushSubscriptionSettings);
when(pushConsumer.getScheduler()).thenReturn(SCHEDULER);
AtomicLong receivedMessagesQuantity = new AtomicLong(0);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index a771fc25..3ad28f95 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -29,9 +29,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Assert;
@@ -41,13 +41,19 @@ public class PushSubscriptionSettingsTest extends TestBase {
@Test
public void testToProtobuf() {
- Resource groupResource = new Resource(FAKE_NAMESPACE,
FAKE_CONSUMER_GROUP_0);
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setNamespace(FAKE_NAMESPACE)
+ .setRequestTimeout(requestTimeout)
+ .setEndpoints(FAKE_ENDPOINTS)
+ .build();
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
- final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
- fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(
+ clientConfiguration, clientId,
+ org.apache.rocketmq.client.java.impl.ClientType.PUSH_CONSUMER,
+ fakeEndpoints(), FAKE_CONSUMER_GROUP_0, subscriptionExpression);
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
@@ -72,15 +78,20 @@ public class PushSubscriptionSettingsTest extends TestBase {
@Test
public void testToProtobufWithSqlExpression() {
- Resource groupResource = new Resource(FAKE_NAMESPACE,
FAKE_CONSUMER_GROUP_0);
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setNamespace(FAKE_NAMESPACE)
+ .setRequestTimeout(requestTimeout)
+ .setEndpoints(FAKE_ENDPOINTS)
+ .build();
ClientId clientId = new ClientId();
-
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
- final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
- fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(
+ clientConfiguration, clientId,
+ org.apache.rocketmq.client.java.impl.ClientType.PUSH_CONSUMER,
+ fakeEndpoints(), FAKE_CONSUMER_GROUP_0, subscriptionExpression);
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
@@ -121,14 +132,20 @@ public class PushSubscriptionSettingsTest extends
TestBase {
Subscription subscription =
Subscription.newBuilder().setFifo(fifo).setReceiveBatchSize(receiveBatchSize)
.setLongPollingTimeout(longPollingTimeout).build();
Settings settings =
Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
- fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setNamespace(FAKE_NAMESPACE)
+ .setRequestTimeout(requestTimeout)
+ .setEndpoints(FAKE_ENDPOINTS)
+ .build();
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(
+ clientConfiguration, clientId,
+ org.apache.rocketmq.client.java.impl.ClientType.PUSH_CONSUMER,
+ fakeEndpoints(), FAKE_CONSUMER_GROUP_0, subscriptionExpression);
pushSubscriptionSettings.sync(settings);
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
index 2f1de06c..23f445c7 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
@@ -27,36 +27,146 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Test;
public class GeneralMessageImplTest extends TestBase {
+
@Test
- public void testMessage() {
+ public void testMessageTagKeysProperty() {
String topic = "testTopic";
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
String tag = "tagA";
+ String key1 = "keyA";
+ String key2 = "keyB";
+ String propertyKey1 = "propertyKey1";
+ String propertyValue1 = "propertyValue1";
+ String propertyKey2 = "propertyKey2";
+ String propertyValue2 = "propertyValue2";
+
List<String> keys = new ArrayList<>();
- keys.add("keyA");
- String messageGroup = "messageGroup0";
- long deliveryTimestamp = System.currentTimeMillis();
+ keys.add(key1);
+ keys.add(key2);
+
Map<String, String> properties = new HashMap<>();
- properties.put("propertyA", "valueA");
+ properties.put(propertyKey1, propertyValue1);
+ properties.put(propertyKey2, propertyValue2);
+
+ final Message message = new MessageBuilderImpl()
+ .setTopic(topic)
+ .setBody(body)
+ .setTag(tag)
+ .setKeys(key1, key2)
+ .addProperty(propertyKey1, propertyValue1)
+ .addProperty(propertyKey2, propertyValue2)
+ .build();
- final MessageImpl message = new MessageImpl(topic, body, tag, keys,
messageGroup, deliveryTimestamp,
- properties);
final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(message);
assertFalse(generalMessage.getMessageId().isPresent());
assertEquals(topic, generalMessage.getTopic());
assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
- assertEquals(properties, generalMessage.getProperties());
assertTrue(generalMessage.getTag().isPresent());
assertEquals(tag, generalMessage.getTag().get());
assertEquals(keys, generalMessage.getKeys());
+ assertFalse(generalMessage.getBornHost().isPresent());
+ assertFalse(generalMessage.getBornTimestamp().isPresent());
+ assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+ assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+ assertFalse(generalMessage.getMessageGroup().isPresent());
+ assertFalse(generalMessage.getLiteTopic().isPresent());
+ assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+
+ // Verify properties
+ Map<String, String> messageProperties = generalMessage.getProperties();
+ assertEquals(properties.size(), messageProperties.size());
+ assertEquals(properties, messageProperties);
+ }
+
+ @Test
+ public void testMessageGroup() {
+ String topic = "testTopic";
+ byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+ String messageGroup = "messageGroup0";
+
+ final Message message = new MessageBuilderImpl()
+ .setTopic(topic)
+ .setBody(body)
+ .setMessageGroup(messageGroup)
+ .build();
+
+ final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(message);
+ assertFalse(generalMessage.getMessageId().isPresent());
+ assertEquals(topic, generalMessage.getTopic());
+ assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+ assertFalse(generalMessage.getTag().isPresent());
+ assertEquals(0, generalMessage.getKeys().size());
assertTrue(generalMessage.getMessageGroup().isPresent());
assertEquals(messageGroup, generalMessage.getMessageGroup().get());
+ assertFalse(generalMessage.getBornHost().isPresent());
+ assertFalse(generalMessage.getBornTimestamp().isPresent());
+ assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+ assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+ assertFalse(generalMessage.getLiteTopic().isPresent());
+ assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+ }
+
+ @Test
+ public void testMessageLiteTopic() {
+ String topic = "testTopic";
+ byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+ String liteTopic = "liteTopic0";
+
+ final Message message = new MessageBuilderImpl()
+ .setTopic(topic)
+ .setBody(body)
+ .setLiteTopic(liteTopic)
+ .build();
+
+ final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(message);
+ assertFalse(generalMessage.getMessageId().isPresent());
+ assertEquals(topic, generalMessage.getTopic());
+ assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+ assertFalse(generalMessage.getTag().isPresent());
+ assertEquals(0, generalMessage.getKeys().size());
+ assertFalse(generalMessage.getMessageGroup().isPresent());
+ assertTrue(generalMessage.getLiteTopic().isPresent());
+ assertEquals(liteTopic, generalMessage.getLiteTopic().get());
+ assertFalse(generalMessage.getBornHost().isPresent());
+ assertFalse(generalMessage.getBornTimestamp().isPresent());
+ assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+ assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+ assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+ }
+
+ @Test
+ public void testMessageDeliveryTimestamp() {
+ String topic = "testTopic";
+ byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+ long deliveryTimestamp = System.currentTimeMillis();
+
+ final Message message = new MessageBuilderImpl()
+ .setTopic(topic)
+ .setBody(body)
+ .setDeliveryTimestamp(deliveryTimestamp)
+ .build();
+
+ final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(message);
+ assertFalse(generalMessage.getMessageId().isPresent());
+ assertEquals(topic, generalMessage.getTopic());
+ assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+ assertFalse(generalMessage.getTag().isPresent());
+ assertEquals(0, generalMessage.getKeys().size());
+ assertFalse(generalMessage.getMessageGroup().isPresent());
+ assertFalse(generalMessage.getLiteTopic().isPresent());
assertTrue(generalMessage.getDeliveryTimestamp().isPresent());
assertEquals(deliveryTimestamp, (long)
generalMessage.getDeliveryTimestamp().get());
assertFalse(generalMessage.getBornHost().isPresent());
@@ -73,6 +183,7 @@ public class GeneralMessageImplTest extends TestBase {
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
String tag = "tagA";
String messageGroup = "messageGroup0";
+ String liteTopic = "liteTopic0";
long deliveryTimestamp = System.currentTimeMillis();
List<String> keys = new ArrayList<>();
keys.add("keyA");
@@ -87,7 +198,8 @@ public class GeneralMessageImplTest extends TestBase {
boolean corrupted = false;
long transportDeliveryTimestamp = System.currentTimeMillis();
- final MessageViewImpl messageView = new MessageViewImpl(messageId,
topic, body, tag, messageGroup,
+ final MessageViewImpl messageView = new MessageViewImpl(messageId,
topic, body, tag,
+ messageGroup, liteTopic,
deliveryTimestamp, keys, properties, bornHost, bornTimestamp,
deliveryAttempt, mq, receiptHandle,
offset, corrupted, transportDeliveryTimestamp);
final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(messageView);
@@ -101,6 +213,8 @@ public class GeneralMessageImplTest extends TestBase {
assertEquals(keys, generalMessage.getKeys());
assertTrue(generalMessage.getMessageGroup().isPresent());
assertEquals(messageGroup, generalMessage.getMessageGroup().get());
+ assertTrue(generalMessage.getLiteTopic().isPresent());
+ assertEquals(liteTopic, generalMessage.getLiteTopic().get());
assertTrue(generalMessage.getDeliveryTimestamp().isPresent());
assertEquals(deliveryTimestamp, (long)
generalMessage.getDeliveryTimestamp().get());
assertTrue(generalMessage.getBornHost().isPresent());
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index a9a918fb..88797670 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -131,4 +131,21 @@ public class MessageImplTest extends TestBase {
assertFalse(message.getDeliveryTimestamp().isPresent());
assertFalse(message.getMessageGroup().isPresent());
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testLiteTopicSetterWithSpaces() {
+ provider.newMessageBuilder().setLiteTopic(" ");
+ }
+
+ @Test
+ public void testLiteTopicSetter() {
+ final Message message = provider.newMessageBuilder()
+ .setLiteTopic("liteTopicA")
+ .setTopic(FAKE_TOPIC_0)
+ .setBody(FAKE_MESSAGE_BODY)
+ .build();
+ assertTrue(message.getLiteTopic().isPresent());
+ assertEquals("liteTopicA", message.getLiteTopic().get());
+ }
+
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 7f80f155..1d7e7581 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -183,7 +183,7 @@ public class TestBase {
final byte[] body = RandomUtils.nextBytes(bodySize);
Map<String, String> properties = new HashMap<>();
List<String> keys = new ArrayList<>();
- return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null,
null,
+ return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null,
null, null,
keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, 1,
corrupted,
System.currentTimeMillis());
}
diff --git a/java/pom.xml b/java/pom.xml
index 7e9aac66..2ac39dc2 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -26,7 +26,7 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java-parent</artifactId>
<packaging>pom</packaging>
- <version>5.0.9-SNAPSHOT</version>
+ <version>5.1.0-SNAPSHOT</version>
<modules>
<module>client-apis</module>
<module>client</module>
@@ -48,7 +48,7 @@
~ 1. Whether it is essential, because the current shaded jar is
fat enough.
~ 2. Make sure that it is compatible with Java 8.
-->
- <rocketmq-proto.version>2.0.5</rocketmq-proto.version>
+ <rocketmq-proto.version>2.1.0</rocketmq-proto.version>
<annotations-api.version>1.3.5</annotations-api.version>
<protobuf.version>3.24.4</protobuf.version>
<grpc.version>1.50.0</grpc.version>
diff --git a/java/test/pom.xml b/java/test/pom.xml
index e834b6fb..78bff647 100644
--- a/java/test/pom.xml
+++ b/java/test/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java-parent</artifactId>
- <version>5.0.9-SNAPSHOT</version>
+ <version>5.1.0-SNAPSHOT</version>
</parent>
<artifactId>test</artifactId>