This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 0a6e3d920e808abb482989c7e1af39df61a96deb Author: Aaron Ai <[email protected]> AuthorDate: Sun Jun 19 13:55:47 2022 +0800 Java: add more docs --- .../rocketmq/client/apis/ClientConfiguration.java | 10 +++--- .../client/apis/ClientConfigurationBuilder.java | 6 ++++ .../apis/StaticSessionCredentialsProvider.java | 3 ++ .../client/apis/consumer/ConsumeResult.java | 3 ++ .../client/apis/consumer/FilterExpression.java | 8 +++++ .../client/apis/consumer/PushConsumer.java | 37 ++++++++++++++++------ .../client/apis/consumer/PushConsumerBuilder.java | 3 ++ .../client/apis/consumer/SimpleConsumer.java | 34 ++++++++++++-------- .../apis/consumer/SimpleConsumerBuilder.java | 3 ++ .../client/apis/producer/ProducerBuilder.java | 12 +++++++ .../apis/producer/TransactionResolution.java | 5 ++- .../rocketmq/client/java/impl/ClientImpl.java | 3 +- .../client/java/message/MessageIdCodec.java | 8 +++++ 13 files changed, 105 insertions(+), 30 deletions(-) diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java index c7006e4..620b71c 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java @@ -24,7 +24,7 @@ import java.util.Optional; * Common client configuration. */ public class ClientConfiguration { - private final String accessPoint; + private final String endpoints; private final SessionCredentialsProvider sessionCredentialsProvider; private final Duration requestTimeout; @@ -32,9 +32,9 @@ public class ClientConfiguration { * The caller is supposed to have validated the arguments and handled throwing exception or * logging warnings already, so we avoid repeating args check here. */ - ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider, + ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider, Duration requestTimeout) { - this.accessPoint = accessPoint; + this.endpoints = endpoints; this.sessionCredentialsProvider = sessionCredentialsProvider; this.requestTimeout = requestTimeout; } @@ -43,8 +43,8 @@ public class ClientConfiguration { return new ClientConfigurationBuilder(); } - public String getAccessPoint() { - return accessPoint; + public String getEndpoints() { + return endpoints; } public Optional<SessionCredentialsProvider> getCredentialsProvider() { diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java index 672826e..f5f5c20 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java @@ -41,6 +41,12 @@ public class ClientConfigurationBuilder { return this; } + /** + * Config the session credential provider. + * + * @param sessionCredentialsProvider session credential provider. + * @return the client configuration builder instance. + */ public ClientConfigurationBuilder setCredentialProvider(SessionCredentialsProvider sessionCredentialsProvider) { this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not " + "be null"); diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/StaticSessionCredentialsProvider.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/StaticSessionCredentialsProvider.java index 67cefc1..0c0a3cd 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/StaticSessionCredentialsProvider.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/StaticSessionCredentialsProvider.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.client.apis; +/** + * Static implementation of {@link SessionCredentialsProvider}, which means the credentials are immutable. + */ public class StaticSessionCredentialsProvider implements SessionCredentialsProvider { private final SessionCredentials credentials; diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/ConsumeResult.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/ConsumeResult.java index 946309a..dd7312f 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/ConsumeResult.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/ConsumeResult.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.client.apis.consumer; +/** + * Designed for push consumer specifically. + */ public enum ConsumeResult { /** * Consume message successfully. diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java index dcda10e..411c5e4 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java @@ -37,10 +37,18 @@ public class FilterExpression { this.filterExpressionType = checkNotNull(filterExpressionType, "filterExpressionType should not be null"); } + /** + * If the {@link FilterExpressionType} is not specified, the type is {@link FilterExpressionType#TAG}. + * + * @param expression tag filter expression. + */ public FilterExpression(String expression) { this(expression, FilterExpressionType.TAG); } + /** + * Default constructor, which means that messages are not filtered. + */ public FilterExpression() { this(TAG_EXPRESSION_SUB_ALL); } diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java index 70b7fce..51c07ea 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java @@ -23,18 +23,37 @@ import java.util.Map; import org.apache.rocketmq.client.apis.ClientException; /** - * PushConsumer is a thread-safe rocketmq client which is used to consume message by group. + * Push consumer is a thread-safe and full-managed rocketmq client which is used to consume message by group. * - * <p>Push consumer is fully-managed consumer, if you are confused to choose your consumer, push consumer should be - * your first consideration. + * <p>Consumers belong to the same consumer group share messages from server, which means they must have the same + * subscription expressions, otherwise the behavior is <strong>undefined</strong>. If a new consumer group's consumer + * is started first time, it consumes from the latest position. Once consumer is started, server records its + * consumption progress and derives it in subsequent startup, or we can call it clustering mode. * - * <p>Consumers belong to the same consumer group share messages from server, - * so consumer in the same group must have the same subscription expressions, otherwise the behavior is - * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once - * consumer is started, server records its consumption progress and derives it in subsequent startup. + * <h3>Clustering mode</h3> + * <pre> + * ┌──────────────────┐ ┌──────────┐ + * │consume progress 0│◄─┐ ┌─►│consumer A│ + * └──────────────────┘ │ │ └──────────┘ + * ├──┤ + * ┌─────────────────┐ │ │ ┌──────────┐ + * │topic X + group 0│◄─┘ └─►│consumer B│ + * └─────────────────┘ └──────────┘ + * </pre> * - * <p>You may intend to maintain different consumption progress for different consumer, different consumer group - * should be set in this case. + * <p>As for broadcasting mode, you may intend to maintain different consumption progress for different consumer, + * different consumer group should be set in this case. + * + * <h3>Broadcasting mode</h3> + * <pre> + * ┌──────────────────┐ ┌──────────┐ ┌──────────────────┐ + * │consume progress 0│◄─┬──┤consumer A│ ┌─►│consume progress 1│ + * └──────────────────┘ │ └──────────┘ │ └──────────────────┘ + * │ │ + * ┌─────────────────┐ │ ┌──────────┐ │ ┌─────────────────┐ + * │topic X + group 0│◄─┘ │consumer B├──┴─►│topic X + group 1│ + * └─────────────────┘ └──────────┘ └─────────────────┘ + * </pre> * * <p>To accelerate the message consumption, push consumer applies * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java index 0a5daca..dcd774f 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java @@ -21,6 +21,9 @@ import java.util.Map; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; +/** + * Builder to config and start {@link PushConsumer}. + */ public interface PushConsumerBuilder { /** * Set the client configuration for consumer. diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java index 90ab988..c931418 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java @@ -27,24 +27,32 @@ import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.MessageView; /** - * SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group. + * Simple consumer is a thread-safe rocketmq client which is used to consume message by group. * - * <p>Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself, + * <p>Simple consumer is lightweight consumer, if you want fully control the message consumption operation by yourself, * simple consumer should be your first consideration. * - * <p>Consumers belong to the same consumer group share messages from server, - * so consumer in the same group must have the same subscription expressions, otherwise the behavior is - * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once - * consumer is started, server records its consumption progress and derives it in subsequent startup. + * <p>Similar to {@link PushConsumer}, consumers belong to the same consumer group share messages from server, which + * means they must have the same subscription expressions, otherwise the behavior is <strong>UNDEFINED</strong>. * - * <p>You may intend to maintain different consumption progress for different consumer, different consumer group - * should be set in this case. + * <p>In addition, the simple consumer can share a consumer group with the {@link PushConsumer}, at which time they + * share the common consumption progress. * - * <p> Simple consumer divide message consumption to 3 parts. - * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api - * to commit this message. - * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration - * parameter. Also, you can change the invisibleDuration by call changeInvisibleDuration api. + * <h3>Share consume progress with push consumer</h3> + * <pre> + * ┌──────────────────┐ ┌─────────────────┐ + * │consume progress 0│◄─┐ ┌─►│simple consumer A│ + * └──────────────────┘ │ │ └─────────────────┘ + * ├──┤ + * ┌─────────────────┐ │ │ ┌───────────────┐ + * │topic X + group 0│◄─┘ └─►│push consumer B│ + * └─────────────────┘ └───────────────┘ + * </pre> + * + * <p>Simple consumer divide message consumption to 3 phases. + * 1. Receive message from server. + * 2. Executes your operations after receiving message. + * 3. Acknowledge the message or change its invisible duration before next delivery according the operation result. */ public interface SimpleConsumer extends Closeable { /** diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java index 49a2804..9f86013 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java @@ -22,6 +22,9 @@ import java.util.Map; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; +/** + * Builder to config and start {@link SimpleConsumer}. + */ public interface SimpleConsumerBuilder { /** * Set the client configuration for simple consumer. diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java index 6f76034..4e9f34f 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java @@ -39,6 +39,18 @@ public interface ProducerBuilder { * <p>Even though the declaration is not essential, we <strong>highly recommend</strong> to declare the topics in * advance, which could help to discover potential mistakes. * + * <pre>{@code + * // Example 0: single topic. + * producerBuilder.setTopics("topicA"); + * // Example 1: multiple topics. + * producerBuilder.setTopics("topicA", "topicB"); + * // Example 2: multiple topics. + * ArrayList<String> topicList = new ArrayList<>(); + * topicList.add("topicA"); + * topicList.add("topicB"); + * producerBuilder.setTopics(topicList); + * }</pre> + * * @param topics topics to send/prepare. * @return the producer builder instance. */ diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/TransactionResolution.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/TransactionResolution.java index ab17cfc..00e6870 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/TransactionResolution.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/TransactionResolution.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.client.apis.producer; +/** + * Resolution of {@link Transaction}. + */ public enum TransactionResolution { /** * Notify server that current transaction should be committed. @@ -30,5 +33,5 @@ public enum TransactionResolution { * Notify server that the state of this transaction is not sure. You should be cautions before return unknown * because the examination from server will be performed periodically. */ - UNKNOWN; + UNKNOWN } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index b7b45fa..cfa6db2 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -123,8 +123,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) { this.clientConfiguration = checkNotNull(clientConfiguration, "clientConfiguration should not be null"); - final String accessPoint = clientConfiguration.getAccessPoint(); - this.accessEndpoints = new Endpoints(accessPoint); + this.accessEndpoints = new Endpoints(clientConfiguration.getEndpoints()); this.topics = topics; // Generate client id firstly. this.clientId = Utilities.genClientId(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java index 59d1578..5d313d8 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java @@ -35,6 +35,14 @@ import org.apache.rocketmq.client.java.misc.Utilities; * <p>The message id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version * number. For V1, these two bytes are 0x0001. * + * <h3>V1 message id example</h3> + * + * <pre> + * ┌──┬────────────┬────┬────────┬────────┐ + * │01│56F7E71C361B│21BC│024CCDBE│00000000│ + * └──┴────────────┴────┴────────┴────────┘ + * </pre> + * * <h3>V1 version message id generation rules</h3> * * <pre>
