This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f57efb82c2a7d960701ffd31746923d41738b865 Author: Zhongliang.Chen <[email protected]> AuthorDate: Sat Jun 18 05:16:35 2022 +0800 [RIP-37] Add new APIs for consumer --- .../main/java/client/apis/ClientConfiguration.java | 61 ++++++++ .../client/apis/ClientConfigurationBuilder.java | 72 ++++++++++ .../java/client/apis/ClientServiceProvider.java | 68 +++++++++ .../src/main/java/client/apis/MessageQueue.java | 24 ++++ .../main/java/client/apis/SessionCredentials.java | 58 ++++++++ .../client/apis/SessionCredentialsProvider.java | 30 ++++ .../apis/StaticSessionCredentialsProvider.java | 35 +++++ .../java/client/apis/consumer/ConsumeResult.java | 31 +++++ .../client/apis/consumer/FilterExpression.java | 56 ++++++++ .../client/apis/consumer/FilterExpressionType.java | 32 +++++ .../java/client/apis/consumer/MessageListener.java | 54 ++++++++ .../java/client/apis/consumer/PushConsumer.java | 94 +++++++++++++ .../client/apis/consumer/PushConsumerBuilder.java | 87 ++++++++++++ .../java/client/apis/consumer/SimpleConsumer.java | 154 +++++++++++++++++++++ .../apis/consumer/SimpleConsumerBuilder.java | 66 +++++++++ .../apis/exception/AuthenticationException.java | 31 +++++ .../apis/exception/AuthorisationException.java | 35 +++++ .../client/apis/exception/ClientException.java | 72 ++++++++++ .../main/java/client/apis/exception/ErrorCode.java | 84 +++++++++++ .../apis/exception/FlowControlException.java | 25 ++++ .../client/apis/exception/NetworkException.java | 28 ++++ .../exception/RemoteIllegalArgumentException.java | 25 ++++ .../apis/exception/ResourceNotFoundException.java | 25 ++++ .../apis/exception/ResourceNotMatchException.java | 24 ++++ .../client/apis/exception/TimeoutException.java | 28 ++++ .../src/main/java/client/apis/message/Message.java | 77 +++++++++++ .../java/client/apis/message/MessageBuilder.java | 91 ++++++++++++ .../main/java/client/apis/message/MessageId.java | 38 +++++ .../java/client/apis/message/MessageIdVersion.java | 29 ++++ .../main/java/client/apis/message/MessageView.java | 121 ++++++++++++++++ .../main/java/client/apis/producer/Producer.java | 100 +++++++++++++ .../java/client/apis/producer/ProducerBuilder.java | 82 +++++++++++ .../java/client/apis/producer/SendReceipt.java | 29 ++++ .../java/client/apis/producer/Transaction.java | 47 +++++++ .../client/apis/producer/TransactionChecker.java | 41 ++++++ .../apis/producer/TransactionResolution.java | 34 +++++ .../apis/retry/BackOffRetryPolicyBuilder.java | 59 ++++++++ .../java/client/apis/retry/BackoffRetryPolicy.java | 86 ++++++++++++ .../main/java/client/apis/retry/RetryPolicy.java | 40 ++++++ java/{ => client-java}/pom.xml | 13 +- java/pom.xml | 3 +- 41 files changed, 2181 insertions(+), 8 deletions(-) diff --git a/java/client-apis/src/main/java/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/client/apis/ClientConfiguration.java new file mode 100644 index 0000000..56a1b56 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/ClientConfiguration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +import java.time.Duration; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Common client configuration. + */ +public class ClientConfiguration { + private final String endpoints; + private final SessionCredentialsProvider sessionCredentialsProvider; + private final Duration requestTimeout; + private final boolean enableTracing; + + public static ClientConfigurationBuilder newBuilder() { + return new ClientConfigurationBuilder(); + } + + public ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider, + Duration requestTimeout, boolean enableTracing) { + this.endpoints = checkNotNull(endpoints, "endpoints should not be null"); + this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be" + + " null"); + this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should be not null"); + this.enableTracing = enableTracing; + } + + public String getEndpoints() { + return endpoints; + } + + public SessionCredentialsProvider getCredentialsProvider() { + return sessionCredentialsProvider; + } + + public Duration getRequestTimeout() { + return requestTimeout; + } + + public boolean isEnableTracing() { + return enableTracing; + } +} diff --git a/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java new file mode 100644 index 0000000..94999bd --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +import java.time.Duration; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Builder to set {@link ClientConfiguration}. + */ +public class ClientConfigurationBuilder { + private String endpoints; + private SessionCredentialsProvider sessionCredentialsProvider; + private Duration requestTimeout; + private boolean enableTracing; + + /** + * Configure the endpoints with which the SDK should communicate. + * + * <p>Endpoints here means address of service, complying with the following scheme(part square brackets is + * optional). + * <p>1. DNS scheme(default): dns:host[:port], host is the host to resolve via DNS, port is the port to return + * for each address. If not specified, 443 is used. + * <p>2. ipv4 scheme: ipv4:address:port[,address:port,...] + * <p>3. ipv6 scheme: ipv6:address:port[,address:port,...] + * <p>4. http/https scheme: http|https://host[:port], similar to DNS scheme, if port not specified, 443 is used. + * + * @param endpoints address of service. + * @return the client configuration builder instance. + */ + public ClientConfigurationBuilder setEndpoints(String endpoints) { + checkNotNull(endpoints, "endpoints should not be not null"); + this.endpoints = endpoints; + return this; + } + + public ClientConfigurationBuilder setCredentialProvider(SessionCredentialsProvider sessionCredentialsProvider) { + this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be " + + "null"); + return this; + } + + public ClientConfigurationBuilder setRequestTimeout(Duration requestTimeout) { + this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should not be not null"); + return this; + } + + public ClientConfigurationBuilder enableTracing(boolean enableTracing) { + this.enableTracing = enableTracing; + return this; + } + + public ClientConfiguration build() { + return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, enableTracing); + } +} diff --git a/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java b/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java new file mode 100644 index 0000000..1ec7522 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +import java.util.Iterator; +import java.util.ServiceLoader; +import org.apache.rocketmq.apis.consumer.PushConsumerBuilder; +import org.apache.rocketmq.apis.consumer.SimpleConsumerBuilder; +import org.apache.rocketmq.apis.message.MessageBuilder; +import org.apache.rocketmq.apis.producer.ProducerBuilder; + +/** + * Service provider to seek client, which load client according to + * <a href="https://en.wikipedia.org/wiki/Service_provider_interface">Java SPI mechanism</a>. + */ +public interface ClientServiceProvider { + static ClientServiceProvider loadService() { + final ServiceLoader<ClientServiceProvider> loaders = ServiceLoader.load(ClientServiceProvider.class); + final Iterator<ClientServiceProvider> iterators = loaders.iterator(); + if (iterators.hasNext()) { + return iterators.next(); + } + throw new UnsupportedOperationException("Client service provider not found"); + } + + /** + * Get the producer builder by current provider. + * + * @return the producer builder instance. + */ + ProducerBuilder newProducerBuilder(); + + /** + * Get the simple consumer builder by current provider. + * + * @return the simple consumer builder instance. + */ + SimpleConsumerBuilder newSimpleConsumerBuilder(); + + /** + * Get the push consumer builder by current provider. + * + * @return the push consumer builder instance. + */ + PushConsumerBuilder newPushConsumerBuilder(); + + /** + * Get the message builder by current provider. + * + * @return the message builder instance. + */ + MessageBuilder newMessageBuilder(); +} diff --git a/java/client-apis/src/main/java/client/apis/MessageQueue.java b/java/client-apis/src/main/java/client/apis/MessageQueue.java new file mode 100644 index 0000000..8c722e2 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/MessageQueue.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +public interface MessageQueue { + String getTopic(); + + String getId(); +} \ No newline at end of file diff --git a/java/client-apis/src/main/java/client/apis/SessionCredentials.java b/java/client-apis/src/main/java/client/apis/SessionCredentials.java new file mode 100644 index 0000000..adef427 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/SessionCredentials.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Session credentials used in service authentications. + */ +public class SessionCredentials { + private final String accessKey; + private final String accessSecret; + private final String securityToken; + + public SessionCredentials(String accessKey, String accessSecret, String securityToken) { + this.accessKey = checkNotNull(accessKey, "accessKey should not be null"); + this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null"); + this.securityToken = checkNotNull(securityToken, "securityToken should not be null"); + } + + public SessionCredentials(String accessKey, String accessSecret) { + this.accessKey = checkNotNull(accessKey, "accessKey should not be null"); + this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null"); + this.securityToken = null; + } + + public String getAccessKey() { + return accessKey; + } + + public String getAccessSecret() { + return accessSecret; + } + + public Optional<String> getSecurityToken() { + if (null == securityToken) { + return Optional.empty(); + } + return Optional.of(securityToken); + } +} diff --git a/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java b/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java new file mode 100644 index 0000000..52e6912 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +/** + * Abstract provider to provide {@link SessionCredentials}. + */ +public interface SessionCredentialsProvider { + /** + * Get the provided credentials. + * + * @return provided credentials. + */ + SessionCredentials getCredentials(); +} diff --git a/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java b/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java new file mode 100644 index 0000000..5c0d46c --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis; + +public class StaticSessionCredentialsProvider implements SessionCredentialsProvider { + private final SessionCredentials credentials; + + public StaticSessionCredentialsProvider(String accessKey, String accessSecret) { + this.credentials = new SessionCredentials(accessKey, accessSecret); + } + + public StaticSessionCredentialsProvider(String accessKey, String accessSecret, String securityToken) { + this.credentials = new SessionCredentials(accessKey, accessSecret, securityToken); + } + + @Override + public SessionCredentials getCredentials() { + return credentials; + } +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java b/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java new file mode 100644 index 0000000..b62939b --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package client.apis.consumer; + +public enum ConsumeResult { + /** + * Consume message success and need commit this message. + */ + SUCCESS, + + /** + * Failed to consume the message, expecting potential delivery after configured backoff. + */ + FAILURE +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java new file mode 100644 index 0000000..7f76ba7 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package client.apis.consumer; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class FilterExpression { + public static final String TAG_EXPRESSION_SUB_ALL = "*"; + public static final String TAG_EXPRESSION_SPLITTER = "\\|\\|"; + private final String expression; + private final FilterExpressionType filterExpressionType; + + public FilterExpression(String expression, FilterExpressionType filterExpressionType) { + this.expression = checkNotNull(expression, "expression should not be null"); + this.filterExpressionType = checkNotNull(filterExpressionType, "filterExpressionType should not be null"); + } + + public String getExpression() { + return expression; + } + + public FilterExpressionType getFilterExpressionType() { + return filterExpressionType; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FilterExpression that = (FilterExpression) o; + return expression.equals(that.expression) && filterExpressionType == that.filterExpressionType; + } + + @Override + public int hashCode() { + return Objects.hash(expression, filterExpressionType); + } +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java b/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java new file mode 100644 index 0000000..1284bfd --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package client.apis.consumer; + +public enum FilterExpressionType { + /** + * Follows SQL92 standard. + */ + SQL92, + /** + * Only support or operation such as + * "tag1 || tag2 || tag3", <br> + * If null or * expression,meaning subscribe all. + */ + TAG +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java new file mode 100644 index 0000000..2e74a4d --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.consumer; + +import org.apache.rocketmq.apis.message.MessageView; + +/** + * <p>MessageListener is used only by PushConsumer to process messages + * synchronously. + * + * <p>PushConsumer will fetch messages from brokers and dispatch them to an + * embedded thread pool in form of <code>Runnable</code> tasks to achieve desirable processing concurrency. + * + * <p>Refer to {@link PushConsumer} for more further specs. + * + * <p> + * <strong>Thread Safety</strong> + * This class may be called concurrently by multiple threads. Implementation should be thread safe. + * </p> + */ +public interface MessageListener { + + /** + * Callback interface to handle incoming messages. + * + * Application developers are expected to implement this interface to fulfill business requirements through + * processing <code>message</code> and return + * <code>ConsumeResult</code> accordingly. + * + * PushConsumer will, on behalf of its group, acknowledge the message to broker on success; In case of failure or + * unexpected exceptions were raised, it will negatively acknowledge <code>message</code>, which would potentially + * get re-delivered after the configured back off period. + * + * @param message The message passed to the listener. + * @return {@link ConsumeResult#SUCCESS} if <code>message</code> is properly processed; {@link + * ConsumeResult#FAILURE} otherwise. + */ + ConsumeResult onMessage(MessageView message); +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java b/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java new file mode 100644 index 0000000..6b7c1a4 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.consumer; + +import java.io.Closeable; +import java.util.Map; +import org.apache.rocketmq.apis.exception.ClientException; + + +/** + * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}. + * + * <p>Consumers of the same group are designed to share messages from broker servers. As a result, consumers of the same + * group must have <strong>exactly identical subscription expressions</strong>, otherwise the behavior is undefined. + * + * <p>For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages + * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of + * group. Thus, consumers can safely restart and resume their progress automatically.</p> + * + * <p>There are scenarios where <a href="https://en.wikipedia.org/wiki/Fan-out_(software)">fan-out</a> is preferred, + * recommended solution is to use dedicated group of each client. + * + * <p>To mitigate latency, PushConsumer adopts + * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> pattern. Namely, + * messages received from broker servers are first cached locally, amount of which is controlled by + * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and + * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve + * desirable concurrency. + */ +public interface PushConsumer extends Closeable { + /** + * Get the load balancing group for consumer. + * + * @return consumer load balancing group. + */ + String getConsumerGroup(); + + /** + * List the existed subscription expressions in push consumer. + * + * @return map of topic to filter expression. + */ + Map<String, FilterExpression> subscriptionExpressions(); + + /** + * Add subscription expression dynamically. + * + * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then + * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one + * replaces the first one instead of integrating them</strong>. + * + * @param topic new topic that need to add or update. + * @param filterExpression new filter expression to add or update. + * @return push consumer instance. + */ + PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException; + + /** + * Remove subscription expression dynamically by topic. + * + * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic + * was removed before would not be delivered to {@link MessageListener} anymore. + * + * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer. + * + * @param topic the topic to remove subscription. + * @return push consumer instance. + */ + PushConsumer unsubscribe(String topic) throws ClientException; + + /** + * Close the push consumer and release all related resources. + * + * <p>Once push consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM + * (finite-state machine) to record the different states for each producer, which is similar to + */ + @Override + void close(); +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java new file mode 100644 index 0000000..82bfd2f --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.consumer; + +import java.util.Map; +import org.apache.rocketmq.apis.ClientConfiguration; +import org.apache.rocketmq.apis.exception.ClientException; + +public interface PushConsumerBuilder { + /** + * Set the client configuration for consumer. + * + * @param clientConfiguration client's configuration. + * @return the consumer builder instance. + */ + PushConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); + + /** + * Set the load balancing group for consumer. + * + * @param consumerGroup consumer load balancing group. + * @return the consumer builder instance. + */ + PushConsumerBuilder setConsumerGroup(String consumerGroup); + + /** + * Add subscriptionExpressions for consumer. + * + * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression. + * @return the consumer builder instance. + */ + PushConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions); + + /** + * Register message listener, all messages meet the subscription expression would across listener here. + * + * @param listener message listener. + * @return the consumer builder instance. + */ + PushConsumerBuilder setMessageListener(MessageListener listener); + + /** + * Set the maximum number of messages cached locally. + * + * @param count message count. + * @return the consumer builder instance. + */ + PushConsumerBuilder setMaxCacheMessageCount(int count); + + /** + * Set the maximum bytes of messages cached locally. + * + * @param bytes message size. + * @return the consumer builder instance. + */ + PushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes); + + /** + * Set the consumption thread count in parallel. + * + * @param count thread count. + * @return the consumer builder instance. + */ + PushConsumerBuilder setThreadCount(int count); + + /** + * Finalize the build of {@link PushConsumer}. + * + * @return the push consumer instance. + */ + PushConsumer build() throws ClientException; +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java new file mode 100644 index 0000000..4bc3827 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.consumer; + +import java.io.Closeable; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.apis.exception.ClientException; +import org.apache.rocketmq.apis.message.MessageView; + +/** + * SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group. + * + * <p>Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself, + * simple consumer should be your first consideration. + * + * <p>Consumers belong to the same consumer group share messages from server, + * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is + * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once + * consumer is started, server records its consumption progress and derives it in subsequent startup. + * + * <p>You may intend to maintain different consumption progress for different consumer, different consumer group + * should be set in this case. + * + * <p> Simple consumer divide message consumption to 3 parts. + * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message. + * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter. + * Also, you can change the invisibleDuration by call changeInvisibleDuration api. + */ +public interface SimpleConsumer extends Closeable { + /** + * Get the load balancing group for simple consumer. + * + * @return consumer load balancing group. + */ + String getConsumerGroup(); + + /** + * Add subscription expression dynamically. + * + * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then + * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one + * replaces the first one instead of integrating them</strong>. + * + * @param topic new topic that need to add or update. + * @param filterExpression new filter expression to add or update. + * @return simple consumer instance. + */ + SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException; + + /** + * Remove subscription expression dynamically by topic. + * + * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic + * was removed before would not be delivered to {@link MessageListener} anymore. + * + * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer. + * + * @param topic the topic to remove subscription. + * @return simple consumer instance. + */ + SimpleConsumer unsubscribe(String topic) throws ClientException; + + /** + * List the existed subscription expressions in simple consumer. + * + * @return map of topic to filter expression. + */ + Map<String, FilterExpression> subscriptionExpressions(); + + /** + * Fetch messages from server synchronously. + * <p> This method returns immediately if there are messages available. + * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned. + * @param maxMessageNum max message num when server returns. + * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout. + * @return list of messageView + */ + List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException; + + /** + * Fetch messages from server asynchronously. + * <p> This method returns immediately if there are messages available. + * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned. + * @param maxMessageNum max message num when server returns. + * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout. + * @return list of messageView + */ + CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException; + + /** + * Ack message to server synchronously, server commit this message. + * + * <p> Duplicate ack request does not take effect and throw exception. + * @param messageView special messageView with handle want to ack. + */ + void ack(MessageView messageView) throws ClientException; + + /** + * Ack message to server asynchronously, server commit this message. + * + * <p> Duplicate ack request does not take effect and throw exception. + * @param messageView special messageView with handle want to ack. + * @return CompletableFuture of this request. + */ + CompletableFuture<Void> ackAsync(MessageView messageView); + + /** + * Changes the invisible duration of a specified message synchronously. + * + * <p> The origin invisible duration for a message decide by ack request. + * + * <p>You must call change request before the origin invisible duration timeout. + * If called change request later than the origin invisible duration, this request does not take effect and throw exception. + * Duplicate change request will refresh the next visible time of this message to other consumers. + * @param messageView special messageView with handle want to change. + * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time. + */ + void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException; + + /** + * Changes the invisible duration of a specified message asynchronously. + * + * <p> The origin invisible duration for a message decide by ack request. + * + * <p> You must call change request before the origin invisible duration timeout. + * If called change request later than the origin invisible duration, this request does not take effect and throw exception. + * Duplicate change request will refresh the next visible time of this message to other consumers. + * @param messageView special messageView with handle want to change. + * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time. + * @return CompletableFuture of this request. + */ + CompletableFuture<Void> changeInvisibleDurationAsync(MessageView messageView, Duration invisibleDuration); + + @Override + void close(); +} diff --git a/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java new file mode 100644 index 0000000..0760b0e --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.consumer; + +import java.time.Duration; +import java.util.Map; +import org.apache.rocketmq.apis.ClientConfiguration; +import org.apache.rocketmq.apis.exception.ClientException; + +public interface SimpleConsumerBuilder { + /** + * Set the client configuration for simple consumer. + * + * @param clientConfiguration client's configuration. + * @return the simple consumer builder instance. + */ + SimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); + + /** + * Set the load balancing group for simple consumer. + * + * @param consumerGroup consumer load balancing group. + * @return the consumer builder instance. + */ + SimpleConsumerBuilder setConsumerGroup(String consumerGroup); + + /** + * Add subscriptionExpressions for simple consumer. + * + * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression. + * @return the consumer builder instance. + */ + SimpleConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions); + + /** + * Set the max await time when receive message from server. + * The simple consumer will hold this long-polling receive requests until a message is returned or a timeout occurs. + * @param awaitDuration The maximum time to block when no message available. + * @return the consumer builder instance. + */ + SimpleConsumerBuilder setAwaitDuration(Duration awaitDuration); + + /** + * Finalize the build of the {@link SimpleConsumer} instance and start. + * + * <p>This method will block until simple consumer starts successfully. + * + * @return the simple consumer instance. + */ + SimpleConsumer build() throws ClientException; +} diff --git a/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java b/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java new file mode 100644 index 0000000..bfe7444 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +/** + * The difference between {@link AuthorisationException} and {@link AuthenticationException} is that + * {@link AuthenticationException} here means current user's identity could not be recognized. + * + * <p>For example, {@link AuthenticationException} will be thrown if access key is invalid. + */ +public class AuthenticationException extends ClientException { + public AuthenticationException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java b/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java new file mode 100644 index 0000000..cc3a93f --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +import org.apache.rocketmq.apis.message.Message; +import org.apache.rocketmq.apis.producer.Producer; + +/** + * The difference between {@link AuthenticationException} and {@link AuthorisationException} is that + * {@link AuthorisationException} here means current users don't have permission to do current operation. + * + * <p>For example, current user is forbidden to send message to this topic, {@link AuthorisationException} will be + * thrown in {@link Producer#send(Message)}. + */ +public class AuthorisationException extends ClientException { + public AuthorisationException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/ClientException.java b/java/client-apis/src/main/java/client/apis/exception/ClientException.java new file mode 100644 index 0000000..018b4da --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/ClientException.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +import com.google.common.base.MoreObjects; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Base exception for all exception raised in client, each exception should derive from current class. + * It should throw exception which is derived from {@link ClientException} rather than {@link ClientException} itself. + */ +public abstract class ClientException extends Exception { + /** + * For those {@link ClientException} along with a remote procedure call, request id could be used to track the + * request. + */ + protected static final String REQUEST_ID_KEY = "request-id"; + + private final ErrorCode errorCode; + private final Map<String, String> context; + + ClientException(ErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + this.context = new HashMap<>(); + } + + ClientException(ErrorCode errorCode, String message) { + super(message); + this.errorCode = errorCode; + this.context = new HashMap<>(); + } + + @SuppressWarnings("SameParameterValue") + protected void putMetadata(String key, String value) { + context.put(key, value); + } + + public Optional<String> getRequestId() { + final String requestId = context.get(REQUEST_ID_KEY); + return null == requestId ? Optional.empty() : Optional.of(requestId); + } + + public ErrorCode getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(super.toString()) + .add("errorCode", errorCode) + .add("context", context) + .toString(); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java b/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java new file mode 100644 index 0000000..1d2895e --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +/** + * Indicates the reason why the exception is thrown, it can be easily divided into the following categories. + * + * <blockquote> + * + * <table> + * <caption>Error Categories and Exceptions</caption> + * <tr> + * <th>Category + * <th>Exception + * <th>Code range + * <tr> + * <td>Illegal client argument + * <td>{@link RemoteIllegalArgumentException} + * <p>{@link IllegalArgumentException} + * <td>{@code [101..199]} + * <tr> + * <td>Authorisation failure + * <td>{@link AuthorisationException} + * <td>{@code [201..299]} + * <tr> + * <td>Resource not found + * <td>{@link ResourceNotFoundException} + * <td>{@code [301..399]} + * </table> + * + * </blockquote> + */ +public enum ErrorCode { + /** + * Format of topic is illegal. + */ + INVALID_TOPIC(101), + /** + * Format of consumer group is illegal. + */ + INVALID_CONSUMER_GROUP(102), + /** + * Message is forbidden to publish. + */ + MESSAGE_PUBLISH_FORBIDDEN(201), + /** + * Topic does not exist. + */ + TOPIC_DOES_NOT_EXIST(301), + /** + * Consumer group does not exist. + */ + CONSUMER_GROUP_DOES_NOT_EXIST(302); + + private final int code; + + ErrorCode(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + @Override + public String toString() { + return String.valueOf(code); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java b/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java new file mode 100644 index 0000000..ed177c0 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +public class FlowControlException extends ClientException { + public FlowControlException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/NetworkException.java b/java/client-apis/src/main/java/client/apis/exception/NetworkException.java new file mode 100644 index 0000000..aa5af7b --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/NetworkException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +public class NetworkException extends ClientException { + public NetworkException(ErrorCode code, String message, Throwable cause) { + super(code, message, cause); + } + + public NetworkException(ErrorCode code, String message) { + super(code, message); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java b/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java new file mode 100644 index 0000000..3d456d7 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +public class RemoteIllegalArgumentException extends ClientException { + public RemoteIllegalArgumentException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} \ No newline at end of file diff --git a/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java b/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java new file mode 100644 index 0000000..19d032a --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +public class ResourceNotFoundException extends ClientException { + public ResourceNotFoundException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java b/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java new file mode 100644 index 0000000..54aa66a --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +public class ResourceNotMatchException extends ClientException { + public ResourceNotMatchException(ErrorCode code, String message) { + super(code, message); + } +} diff --git a/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java b/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java new file mode 100644 index 0000000..6c16b1a --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.exception; + +public class TimeoutException extends ClientException { + public TimeoutException(ErrorCode code, String message, Throwable cause) { + super(code, message, cause); + } + + public TimeoutException(ErrorCode code, String message) { + super(code, message); + } +} diff --git a/java/client-apis/src/main/java/client/apis/message/Message.java b/java/client-apis/src/main/java/client/apis/message/Message.java new file mode 100644 index 0000000..7504de9 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/message/Message.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.message; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.rocketmq.apis.producer.Producer; + +/** + * Abstract message only used for {@link Producer}. + */ +public interface Message { + /** + * Get the topic of message, which is the first classifier for message. + * + * @return topic of message. + */ + String getTopic(); + + /** + * Get the <strong>deep copy</strong> of message body. + * + * @return the <strong>deep copy</strong> of message body. + */ + byte[] getBody(); + + /** + * Get the <strong>deep copy</strong> of message properties. + * + * @return the <strong>deep copy</strong> of message properties. + */ + Map<String, String> getProperties(); + + /** + * Get the tag of message, which is the second classifier besides topic. + * + * @return the tag of message. + */ + Optional<String> getTag(); + + /** + * Get the key collection of message. + * + * @return <strong>the key collection</strong> of message. + */ + Collection<String> getKeys(); + + /** + * Get the message group, which make sense only when topic type is fifo. + * + * @return message group, which is optional. + */ + Optional<String> getMessageGroup(); + + /** + * Get the expected delivery timestamp, which make sense only when topic type is delay. + * + * @return message expected delivery timestamp, which is optional. + */ + Optional<Long> getDeliveryTimestamp(); +} diff --git a/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java b/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java new file mode 100644 index 0000000..4db9805 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.message; + +import java.util.Collection; + +/** + * Builder to config {@link Message}. + */ +public interface MessageBuilder { + /** + * Set the topic for message. + * + * @param topic topic for the message. + * @return the message builder instance. + */ + MessageBuilder setTopic(String topic); + + /** + * Set the body for message. + * + * @param body body for the message. + * @return the message builder instance. + */ + MessageBuilder setBody(byte[] body); + + /** + * Set the tag for message. + * + * @param tag tag for the message. + * @return the message builder instance. + */ + MessageBuilder setTag(String tag); + + /** + * Set the key for message. + * + * @param key key for the message. + * @return the message builder instance. + */ + MessageBuilder setKey(String key); + + /** + * Set the key collection for message. + * + * @param keys key collection for the message. + * @return the message builder instance. + */ + MessageBuilder setKeys(Collection<String> keys); + + /** + * Set the group for message. + * + * @param messageGroup group for the message. + * @return the message builder instance. + */ + MessageBuilder setMessageGroup(String messageGroup); + + /** + * Add user property for message. + * + * @param key single property key. + * @param value single property value. + * @return the message builder instance. + */ + MessageBuilder addProperty(String key, String value); + + /** + * Finalize the build of the {@link Message} instance. + * + * <p>Unique {@link MessageId} is generated after message building.</p> + * + * @return the message instance. + */ + Message build(); +} diff --git a/java/client-apis/src/main/java/client/apis/message/MessageId.java b/java/client-apis/src/main/java/client/apis/message/MessageId.java new file mode 100644 index 0000000..7b396ba --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/message/MessageId.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.message; + +/** + * Abstract message id, the implement must override {@link Object#toString()}, which indicates the message id using + * string form. + */ +public interface MessageId { + /** + * Get the version of message id. + * + * @return the version of message id. + */ + MessageIdVersion getVersion(); + + /** + * The implementation <strong>must</strong> override this method, which indicates the message id using string form. + * + * @return string-formed string id. + */ + String toString(); +} diff --git a/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java b/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java new file mode 100644 index 0000000..ab3fc57 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.message; + +public enum MessageIdVersion { + /** + * V0 version, whose length is 32. + */ + V0, + /** + * V1 version, whose length is 34 and begins with "01". + */ + V1 +} diff --git a/java/client-apis/src/main/java/client/apis/message/MessageView.java b/java/client-apis/src/main/java/client/apis/message/MessageView.java new file mode 100644 index 0000000..3f576a4 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/message/MessageView.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.message; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.rocketmq.apis.MessageQueue; + +/** + * {@link MessageView} provides a read-only view for message, that's why setters do not exist here. In addition, + * it only makes sense when {@link Message} is sent successfully, or it could be considered as a return receipt + * for producer/consumer. + */ +public interface MessageView { + /** + * Get the unique id of message. + * + * @return unique id. + */ + MessageId getMessageId(); + + /** + * Get the topic of message. + * + * @return topic of message. + */ + String getTopic(); + + /** + * Get the <strong>deep copy</strong> of message body, which makes the modification of return value does not + * affect the message itself. + * + * @return the <strong>deep copy</strong> of message body. + */ + byte[] getBody(); + + /** + * Get the <strong>deep copy</strong> of message properties, which makes the modification of return value does + * not affect the message itself. + * + * @return the <strong>deep copy</strong> of message properties. + */ + Map<String, String> getProperties(); + + /** + * Get the tag of message, which is optional. + * + * @return the tag of message, which is optional. + */ + Optional<String> getTag(); + + /** + * Get the key collection of message. + * + * @return <strong>the key collection</strong> of message. + */ + Collection<String> getKeys(); + + /** + * Get the message group, which is optional and only make sense only when topic type is fifo. + * + * @return message group, which is optional. + */ + Optional<String> getMessageGroup(); + + /** + * Get the expected delivery timestamp, which make sense only when topic type is delay. + * + * @return message expected delivery timestamp, which is optional. + */ + Optional<Long> getDeliveryTimestamp(); + + /** + * Get the born host of message. + * + * @return born host of message. + */ + String getBornHost(); + + /** + * Get the born timestamp of message. + * + * @return born timestamp of message. + */ + long getBornTimestamp(); + + /** + * Get the delivery attempt for message. + * + * @return delivery attempt. + */ + int getDeliveryAttempt(); + + /** + * Get the {@link MessageQueue} of message. + * + * @return message queue. + */ + MessageQueue getMessageQueue(); + + /** + * Get the position of message in {@link MessageQueue}. + */ + long getOffset(); +} diff --git a/java/client-apis/src/main/java/client/apis/producer/Producer.java b/java/client-apis/src/main/java/client/apis/producer/Producer.java new file mode 100644 index 0000000..a973635 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/producer/Producer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.producer; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.apis.exception.ClientException; +import org.apache.rocketmq.apis.message.Message; + +/** + * Producer is a thread-safe rocketmq client which is used to publish messages. + * + * <p>On account of network timeout or other reasons, rocketmq producer only promised the at-least-once semantics. + * For producer, at-least-once semantics means potentially attempts are made at sending it, messages may be + * duplicated but not lost. + */ +public interface Producer extends Closeable { + /** + * Sends a message synchronously. + * + * <p>This method does not return until it gets the definitive result. + * + * @param message message to send. + */ + SendReceipt send(Message message) throws ClientException; + + /** + * Sends a transactional message synchronously. + * + * @param message message to send. + * @param transaction transaction to bind. + * @return the message id assigned to the appointed message. + */ + SendReceipt send(Message message, Transaction transaction) throws ClientException; + + /** + * Sends a message asynchronously. + * + * <p>This method returns immediately, the result is included in the {@link CompletableFuture}; + * + * @param message message to send. + * @return a future that indicates the result. + */ + CompletableFuture<SendReceipt> sendAsync(Message message); + + /** + * Sends batch messages synchronously. + * + * <p>This method does not return until it gets the definitive result. + * + * <p>All messages to send should have the same topic. + * + * @param messages batch messages to send. + * @return collection indicates the message id assigned to the appointed message, which keep the same order + * messages collection. + */ + List<SendReceipt> send(List<Message> messages) throws ClientException; + + /** + * Begins a transaction. + * + * <p>For example: + * + * <pre>{@code + * Transaction transaction = producer.beginTransaction(); + * SendReceipt receipt1 = producer.send(message1, transaction); + * SendReceipt receipt2 = producer.send(message2, transaction); + * transaction.commit(); + * }</pre> + * + * @return a transaction entity to execute commit/rollback operation. + */ + Transaction beginTransaction() throws ClientException; + + /** + * Close the producer and release all related resources. + * + * <p>This method does not return until all related resource is released. Once producer is closed, <strong>it could + * not be started once again.</strong> we maintained an FSM (finite-state machine) to record the different states + * for each producer. + */ + @Override + void close(); +} diff --git a/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java b/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java new file mode 100644 index 0000000..31a79db --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.producer; + +import org.apache.rocketmq.apis.ClientConfiguration; +import org.apache.rocketmq.apis.exception.ClientException; +import org.apache.rocketmq.apis.message.Message; +import org.apache.rocketmq.apis.retry.BackoffRetryPolicy; + +/** + * Builder to config and start {@link Producer}. + */ +public interface ProducerBuilder { + /** + * Set the client configuration for producer. + * + * @param clientConfiguration client's configuration. + * @return the producer builder instance. + */ + ProducerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); + + /** + * Declare topics ahead of message sending/preparation. + * + * <p>Even though the declaration is not essential, we <strong>highly recommend</strong> to declare the topics in + * advance, which could help to discover potential mistakes. + * + * @param topics topics to send/prepare. + * @return the producer builder instance. + */ + ProducerBuilder setTopics(String... topics); + + /** + * Set the threads count for {@link Producer#sendAsync(Message)}. + * + * @return the producer builder instance. + */ + ProducerBuilder setAsyncThreadCount(int count); + + /** + * Set the retry policy to send message. + * + * @param retryPolicy policy to re-send message when failure encountered. + * @return the producer builder instance. + */ + ProducerBuilder setRetryPolicy(BackoffRetryPolicy retryPolicy); + + /** + * Set the transaction checker for producer. + * + * @param checker transaction checker. + * @return the produce builder instance. + */ + ProducerBuilder setTransactionChecker(TransactionChecker checker); + + /** + * Finalize the build of {@link Producer} instance and start. + * + * <p>The producer does a series of preparatory work during startup, which could help to identify more unexpected + * error earlier. + * + * <p>Especially, if this method is invoked more than once, different producer will be created and started. + * + * @return the producer instance. + */ + Producer build() throws ClientException; +} diff --git a/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java b/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java new file mode 100644 index 0000000..8340ab2 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.producer; + +import org.apache.rocketmq.apis.MessageQueue; +import org.apache.rocketmq.apis.message.MessageId; + +public interface SendReceipt { + MessageId getMessageId(); + + MessageQueue getMessageQueue(); + + long getOffset(); +} diff --git a/java/client-apis/src/main/java/client/apis/producer/Transaction.java b/java/client-apis/src/main/java/client/apis/producer/Transaction.java new file mode 100644 index 0000000..e2df718 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/producer/Transaction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.producer; + +import org.apache.rocketmq.apis.exception.ClientException; + +/** + * An entity to describe an independent transaction. + * + * <p>Once request of commit of roll-back reached server, subsequent arrived commit or roll-back request in + * {@link Transaction} would be ignored by server. + * + * <p>If transaction is not commit/roll-back in time, it is suspended until it is solved by {@link TransactionChecker} + * or reach the end of life. + */ +public interface Transaction { + /** + * Try to commit the transaction, which would expose the message before the transaction is closed if no exception + * thrown. + * + * <p>What you should pay more attention is that commit may be successful even exception thrown. + */ + void commit() throws ClientException; + + /** + * Try to roll back the transaction, which would expose the message before the transaction is closed if no exception + * thrown. + * + * <p>What you should pay more attention is that roll-back may be successful even exception thrown. + */ + void rollback() throws ClientException; +} \ No newline at end of file diff --git a/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java b/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java new file mode 100644 index 0000000..5050469 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.producer; + +import org.apache.rocketmq.apis.message.MessageView; + +/** + * Used to determine {@link TransactionResolution} when {@link Transaction} is not committed or roll-backed in time. + * {@link Transaction#commit()} and {@link Transaction#rollback()} does not promise that it would be applied + * successfully, so that checker here is necessary. + * + * <p>If {@link TransactionChecker#check(MessageView)} returns {@link TransactionResolution#UNKNOWN} or exception + * raised during the invocation of {@link TransactionChecker#check(MessageView)}, the examination from server will be + * performed periodically. + */ +public interface TransactionChecker { + /** + * Server will solve the suspended transactional message by this method. + * + * <p>If exception was thrown in this method, which equals {@link TransactionResolution#UNKNOWN} is returned. + * + * @param messageView message to determine {@link TransactionResolution}. + * @return the transaction resolution. + */ + TransactionResolution check(MessageView messageView); +} diff --git a/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java b/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java new file mode 100644 index 0000000..afaa4ff --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.producer; + +public enum TransactionResolution { + /** + * Notify server that current transaction should be committed. + */ + COMMIT, + /** + * Notify server that current transaction should be roll-backed. + */ + ROLLBACK, + /** + * Notify server that the state of this transaction is not sure. You should be cautions before return unknown + * because the examination from server will be performed periodically. + */ + UNKNOWN; +} diff --git a/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java b/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java new file mode 100644 index 0000000..8f2fd02 --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.retry; + +import java.time.Duration; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class BackOffRetryPolicyBuilder { + private int maxAttempts = 3; + private Duration initialBackoff = Duration.ofMillis(100); + private Duration maxBackoff = Duration.ofSeconds(1); + private int backoffMultiplier = 2; + + public BackOffRetryPolicyBuilder() { + } + + BackOffRetryPolicyBuilder setMaxAttempts(int maxAttempts) { + checkArgument(maxAttempts > 0, "maxAttempts must be positive"); + this.maxAttempts = maxAttempts; + return this; + } + + BackOffRetryPolicyBuilder setInitialBackoff(Duration initialBackoff) { + this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null"); + return this; + } + + BackOffRetryPolicyBuilder setMaxBackoff(Duration maxBackoff) { + this.maxBackoff = checkNotNull(maxBackoff, "maxBackoff should not be null"); + return this; + } + + BackOffRetryPolicyBuilder setBackoffMultiplier(int backoffMultiplier) { + checkArgument(backoffMultiplier > 0, "backoffMultiplier must be positive"); + this.backoffMultiplier = backoffMultiplier; + return this; + } + + BackoffRetryPolicy build() { + return new BackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier); + } +} diff --git a/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java b/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java new file mode 100644 index 0000000..2c5e0ff --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.retry; + +import com.google.common.base.MoreObjects; +import java.time.Duration; +import java.util.Random; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The {@link BackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to + * <a href="https://github.com/grpc/proposal/blob/master/A6-client-retries.md">gRPC Retry Design</a>. + */ +public class BackoffRetryPolicy implements RetryPolicy { + public static BackOffRetryPolicyBuilder newBuilder() { + return new BackOffRetryPolicyBuilder(); + } + + private final Random random; + private final int maxAttempts; + private final Duration initialBackoff; + private final Duration maxBackoff; + private final int backoffMultiplier; + + public BackoffRetryPolicy(int maxAttempts, Duration initialBackoff, Duration maxBackoff, int backoffMultiplier) { + checkArgument(maxBackoff.compareTo(initialBackoff) <= 0, "initialBackoff should not be minor than maxBackoff"); + checkArgument(maxAttempts > 0, "maxAttempts must be positive"); + this.random = new Random(); + this.maxAttempts = maxAttempts; + this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null"); + this.maxBackoff = maxBackoff; + this.backoffMultiplier = backoffMultiplier; + } + + @Override + public int getMaxAttempts() { + return maxAttempts; + } + + @Override + public Duration getNextAttemptDelay(int attempt) { + checkArgument(attempt > 0, "attempt must be positive"); + int randomNumberBound = Math.min(initialBackoff.getNano() * (backoffMultiplier ^ (attempt - 1)), + maxBackoff.getNano()); + return Duration.ofNanos(random.nextInt(randomNumberBound)); + } + + public Duration getInitialBackoff() { + return initialBackoff; + } + + public Duration getMaxBackoff() { + return maxBackoff; + } + + public int getBackoffMultiplier() { + return backoffMultiplier; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxAttempts", maxAttempts) + .add("initialBackoff", initialBackoff) + .add("maxBackoff", maxBackoff) + .add("backoffMultiplier", backoffMultiplier) + .toString(); + } +} diff --git a/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java b/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java new file mode 100644 index 0000000..c890dcf --- /dev/null +++ b/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client.apis.retry; + +import java.time.Duration; + +/** + * Internal interface for retry policy. + */ +interface RetryPolicy { + /** + * Get the max attempt times for retry. + * + * @return max attempt times. + */ + int getMaxAttempts(); + + /** + * Get await time after current attempts, the attempt index starts at 1. + * + * @param attempt current attempt. + * @return await time. + */ + Duration getNextAttemptDelay(int attempt); +} diff --git a/java/pom.xml b/java/client-java/pom.xml similarity index 62% copy from java/pom.xml copy to java/client-java/pom.xml index 049a500..a5be176 100644 --- a/java/pom.xml +++ b/java/client-java/pom.xml @@ -2,15 +2,14 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-client-java-parent</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>5.0.0-SNAPSHOT</version> + </parent> <modelVersion>4.0.0</modelVersion> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client-java-parent</artifactId> - <packaging>pom</packaging> - <version>5.0.0-SNAPSHOT</version> - <modules> - <module>apis</module> - </modules> + <artifactId>rocketmq-client-java</artifactId> <properties> <maven.compiler.release>8</maven.compiler.release> diff --git a/java/pom.xml b/java/pom.xml index 049a500..39886f4 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -9,7 +9,8 @@ <packaging>pom</packaging> <version>5.0.0-SNAPSHOT</version> <modules> - <module>apis</module> + <module>client-apis</module> + <module>client-java</module> </modules> <properties>
