This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/master by this push:
new 8f37ff2 [RIP-9] Commit docs Example_OpenMessaging.md (#797)
8f37ff2 is described below
commit 8f37ff27db02177a1e336c8d5ee1a089a4a23939
Author: xiongwu1 <[email protected]>
AuthorDate: Thu Feb 21 15:58:25 2019 +0800
[RIP-9] Commit docs Example_OpenMessaging.md (#797)
[RIP-9] Commit docs Example_OpenMessaging.md
---
docs/en/Example_OpenMessaging.md | 118 +++++++++++++++++++++++++++++++++++++++
1 file changed, 118 insertions(+)
diff --git a/docs/en/Example_OpenMessaging.md b/docs/en/Example_OpenMessaging.md
new file mode 100644
index 0000000..026e76e
--- /dev/null
+++ b/docs/en/Example_OpenMessaging.md
@@ -0,0 +1,118 @@
+# OpenMessaging Example
+[OpenMessaging](https://openmessaging.github.io/), which includes the
establishment of industry guidelines and messaging, streaming specifications to
provide a common framework for finance, ecommerce, IoT and big-data area. The
design principles are the cloud-oriented, simplicity, flexibility, and language
independent in distributed heterogeneous environments. Conformance to these
specifications will make it possible to develop a heterogeneous messaging
applications across all major plat [...]
+
+RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the
following examples demonstrate how to access RocketMQ based on OpenMessaging.
+
+## OMSProducer
+The following example shows how to send message to RocketMQ broker in
synchronous, asynchronous, or one-way transmissions.
+
+```
+public class OMSProducer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
+
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final Producer producer = messagingAccessPoint.createProducer();
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ producer.startup();
+ System.out.printf("Producer startup OK%n");
+
+ {
+ Message message =
producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ SendResult sendResult = producer.send(message);
+ System.out.printf("Send sync message OK, msgId: %s%n",
sendResult.messageId());
+ }
+
+ {
+ final Promise<SendResult> result =
producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ result.addListener(new PromiseListener<SendResult>() {
+ @Override
+ public void operationCompleted(Promise<SendResult> promise) {
+ System.out.printf("Send async message OK, msgId: %s%n",
promise.get().messageId());
+ }
+
+ @Override
+ public void operationFailed(Promise<SendResult> promise) {
+ System.out.printf("Send async message Failed, error:
%s%n", promise.getThrowable().getMessage());
+ }
+ });
+ }
+
+ {
+
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ System.out.printf("Send oneway message OK%n");
+ }
+
+ producer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+}
+```
+## OMSPullConsumer
+Use OMS PullConsumer to poll messages from a specified queue.
+
+```
+public class OMSPullConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
+
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final PullConsumer consumer =
messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ consumer.startup();
+ System.out.printf("Consumer startup OK%n");
+
+ Message message = consumer.poll();
+ if (message != null) {
+ String msgId =
message.headers().getString(MessageHeader.MESSAGE_ID);
+ System.out.printf("Received one message: %s%n", msgId);
+ consumer.ack(msgId);
+ }
+
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+}
+
+```
+## OMSPushConsumer
+Attaches OMS PushConsumer to a specified queue and consumes messages by
MessageListener
+
+```
+public class OMSPushConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
+
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final PushConsumer consumer = messagingAccessPoint.
+
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+ @Override
+ public void onMessage(final Message message, final
ReceivedMessageContext context) {
+ System.out.printf("Received one message: %s%n",
message.headers().getString(MessageHeader.MESSAGE_ID));
+ context.ack();
+ }
+ });
+
+ }
+}
+```