This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 6672a68 Add client quickstart example for java (#85) 6672a68 is described below commit 6672a68de62cafb13e9fabc2c825492397450de5 Author: Aaron Ai <yangkun....@gmail.com> AuthorDate: Thu Jul 28 21:17:34 2022 +0800 Add client quickstart example for java (#85) --- .gitignore | 3 - .../client/java/example/AsyncProducerExample.java | 82 +++++++++++++++++++ .../java/example/AsyncSimpleConsumerExample.java | 95 ++++++++++++++++++++++ .../java/example/ProducerDelayMessageExample.java | 83 +++++++++++++++++++ .../java/example/ProducerFifoMessageExample.java | 81 ++++++++++++++++++ .../java/example/ProducerNormalMessageExample.java | 79 ++++++++++++++++++ .../example/ProducerTransactionMessageExample.java | 94 +++++++++++++++++++++ .../client/java/example/PushConsumerExample.java | 72 ++++++++++++++++ .../client/java/example/SimpleConsumerExample.java | 82 +++++++++++++++++++ 9 files changed, 668 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index ee46af5..68a6ad8 100644 --- a/.gitignore +++ b/.gitignore @@ -25,8 +25,5 @@ target/ dependency-reduced-pom.xml .flattened-pom.xml -# Java -java/client/src/main/java/org/apache/rocketmq/client/java/example/ - # C# obj/ diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java new file mode 100644 index 0000000..de0f390 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncProducerExample { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProducerExample.class); + + private AsyncProducerExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String topic = "yourTopic"; + final Producer producer = provider.newProducerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before + // message publishing. + .setTopics(topic) + // May throw {@link ClientException} if the producer is not initialized. + .build(); + // Define your message body. + byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); + String tag = "yourMessageTagA"; + final Message message = provider.newMessageBuilder() + // Set topic for the current message. + .setTopic(topic) + // Message secondary classifier of message besides topic. + .setTag(tag) + // Key(s) of the message, another way to mark message besides message id. + .setKeys("yourMessageKey-0e094a5f9d85") + .setBody(body) + .build(); + final CompletableFuture<SendReceipt> future = producer.sendAsync(message); + future.whenComplete((sendReceipt, throwable) -> { + if (null == throwable) { + LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); + } else { + LOGGER.error("Failed to send message", throwable); + } + }); + // Close the producer when you don't need it anymore. + producer.close(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java new file mode 100644 index 0000000..2bda9b5 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.SimpleConsumer; +import org.apache.rocketmq.client.apis.message.MessageId; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncSimpleConsumerExample { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSimpleConsumerExample.class); + + private AsyncSimpleConsumerExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String consumerGroup = "yourConsumerGroup"; + Duration awaitDuration = Duration.ofSeconds(30); + String tag = "yourMessageTagA"; + String topic = "yourTopic"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + SimpleConsumer consumer = provider.newSimpleConsumerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the consumer group name. + .setConsumerGroup(consumerGroup) + // set await duration for long-polling. + .setAwaitDuration(awaitDuration) + // Set the subscription for the consumer. + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .build(); + // Max message num for each long polling. + int maxMessageNum = 16; + // Set message invisible duration after it is received. + Duration invisibleDuration = Duration.ofSeconds(30); + final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum, invisibleDuration); + future0.thenAccept(message -> { + final Map<MessageId, CompletableFuture<Void>> map = + message.stream().collect(Collectors.toMap(MessageView::getMessageId, consumer::ackAsync)); + for (Map.Entry<MessageId, CompletableFuture<Void>> entry : map.entrySet()) { + final MessageId messageId = entry.getKey(); + final CompletableFuture<Void> future = entry.getValue(); + future.thenAccept(v -> LOGGER.info("Message is acknowledged successfully, messageId={}", messageId)) + .exceptionally(throwable -> { + LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId); + return null; + }); + } + }).exceptionally(t -> { + LOGGER.error("Failed to receive message from remote", t); + return null; + }); + // Close the simple consumer when you don't need it anymore. + consumer.close(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java new file mode 100644 index 0000000..1fae792 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerDelayMessageExample { + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerDelayMessageExample.class); + + private ProducerDelayMessageExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String topic = "yourDelayTopic"; + final Producer producer = provider.newProducerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before + // message publishing. + .setTopics(topic) + // May throw {@link ClientException} if the producer is not initialized. + .build(); + // Define your message body.a + byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); + String tag = "yourMessageTagA"; + Duration messageDelayTime = Duration.ofSeconds(10); + final Message message = provider.newMessageBuilder() + // Set topic for the current message. + .setTopic(topic) + // Message secondary classifier of message besides topic. + .setTag(tag) + // Key(s) of the message, another way to mark message besides message id. + .setKeys("yourMessageKey-3ee439f945d7") + // Set expected delivery timestamp of message. + .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) + .setBody(body) + .build(); + try { + final SendReceipt sendReceipt = producer.send(message); + LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); + } catch (Throwable t) { + LOGGER.error("Failed to send message", t); + } + // Close the producer when you don't need it anymore. + producer.close(); + } +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java new file mode 100644 index 0000000..e087e46 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerFifoMessageExample { + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class); + + private ProducerFifoMessageExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String topic = "yourFifoTopic"; + final Producer producer = provider.newProducerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before + // message publishing. + .setTopics(topic) + // May throw {@link ClientException} if the producer is not initialized. + .build(); + // Define your message body. + byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); + String tag = "yourMessageTagA"; + final Message message = provider.newMessageBuilder() + // Set topic for the current message. + .setTopic(topic) + // Message secondary classifier of message besides topic. + .setTag(tag) + // Key(s) of the message, another way to mark message besides message id. + .setKeys("yourMessageKey-1ff69ada8e0e") + // Message group decides the message delivery order. + .setMessageGroup("youMessageGroup0") + .setBody(body) + .build(); + try { + final SendReceipt sendReceipt = producer.send(message); + LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); + } catch (Throwable t) { + LOGGER.error("Failed to send message", t); + } + // Close the producer when you don't need it anymore. + producer.close(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java new file mode 100644 index 0000000..c28800e --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerNormalMessageExample { + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerNormalMessageExample.class); + + private ProducerNormalMessageExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + String endpoints = "foobar.com:8081"; + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String topic = "yourNormalTopic"; + final Producer producer = provider.newProducerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before + // message publishing. + .setTopics(topic) + // May throw {@link ClientException} if the producer is not initialized. + .build(); + // Define your message body. + byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); + String tag = "yourMessageTagA"; + final Message message = provider.newMessageBuilder() + // Set topic for the current message. + .setTopic(topic) + // Message secondary classifier of message besides topic. + .setTag(tag) + // Key(s) of the message, another way to mark message besides message id. + .setKeys("yourMessageKey-1c151062f96e") + .setBody(body) + .build(); + try { + final SendReceipt sendReceipt = producer.send(message); + LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); + } catch (Throwable t) { + LOGGER.error("Failed to send message", t); + } + // Close the producer when you don't need it anymore. + producer.close(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java new file mode 100644 index 0000000..ba71357 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.apache.rocketmq.client.apis.producer.Transaction; +import org.apache.rocketmq.client.apis.producer.TransactionChecker; +import org.apache.rocketmq.client.apis.producer.TransactionResolution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerTransactionMessageExample { + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerTransactionMessageExample.class); + + private ProducerTransactionMessageExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String topic = "yourTransactionTopic"; + TransactionChecker checker = messageView -> { + LOGGER.info("Receive transactional message check, message={}", messageView); + // Return the transaction resolution according to your business logic. + return TransactionResolution.COMMIT; + }; + Producer producer = provider.newProducerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before + // message publishing. + .setTopics(topic) + // Set transactional checker. + .setTransactionChecker(checker) + .build(); + final Transaction transaction = producer.beginTransaction(); + // Define your message body. + byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); + String tag = "yourMessageTagA"; + final Message message = provider.newMessageBuilder() + // Set topic for the current message. + .setTopic(topic) + // Message secondary classifier of message besides topic. + .setTag(tag) + // Key(s) of the message, another way to mark message besides message id. + .setKeys("yourMessageKey-565ef26f5727") + .setBody(body) + .build(); + try { + final SendReceipt sendReceipt = producer.send(message, transaction); + LOGGER.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId()); + } catch (Throwable t) { + LOGGER.error("Failed to send message", t); + return; + } + // Commit the transaction. + transaction.commit(); + // Or rollback the transaction. + // transaction.rollback(); + // Close the producer when you don't need it anymore. + producer.close(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java new file mode 100644 index 0000000..79bfd3c --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.util.Collections; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PushConsumerExample { + private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); + + private PushConsumerExample() { + } + + public static void main(String[] args) throws ClientException, IOException, InterruptedException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String tag = "yourMessageTagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + String consumerGroup = "yourConsumerGroup"; + String topic = "yourTopic"; + PushConsumer pushConsumer = provider.newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the consumer group name. + .setConsumerGroup(consumerGroup) + // Set the subscription for the consumer. + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageView -> { + // Handle the received message and return consume result. + LOGGER.info("Consume message={}", messageView); + return ConsumeResult.SUCCESS; + }) + .build(); + // Block the main thread, no need for production environment. + Thread.sleep(Long.MAX_VALUE); + // Close the push consumer when you don't need it anymore. + pushConsumer.close(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java new file mode 100644 index 0000000..afbef23 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.example; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.SimpleConsumer; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleConsumerExample { + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class); + + private SimpleConsumerExample() { + } + + public static void main(String[] args) throws ClientException, IOException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + String endpoints = "foobar.com:8081"; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String consumerGroup = "yourConsumerGroup"; + Duration awaitDuration = Duration.ofSeconds(30); + String tag = "yourMessageTagA"; + String topic = "yourTopic"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + SimpleConsumer consumer = provider.newSimpleConsumerBuilder() + .setClientConfiguration(clientConfiguration) + // Set the consumer group name. + .setConsumerGroup(consumerGroup) + // set await duration for long-polling. + .setAwaitDuration(awaitDuration) + // Set the subscription for the consumer. + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .build(); + // Max message num for each long polling. + int maxMessageNum = 16; + // Set message invisible duration after it is received. + Duration invisibleDuration = Duration.ofSeconds(30); + final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration); + for (MessageView message : messages) { + try { + consumer.ack(message); + } catch (Throwable t) { + LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t); + } + } + // Close the simple consumer when you don't need it anymore. + consumer.close(); + } +}