xwm1992 commented on a change in pull request #540:
URL:
https://github.com/apache/incubator-eventmesh/pull/540#discussion_r716311689
##########
File path: docs/cn/blog/HTTP start.md
##########
@@ -0,0 +1,195 @@
+# HTTP的启动
+
+> 首先我们先将一下启动前的准备工作,也就是HTTPServer的初始化。
+>
+> 在EventMeshServer初始化的时候,会连带这初始化EventMeshHTTPServer,下面我就几点做出说明:
+
+
+
+1.
在初始化的时候,会初始化多个线程池,这个线程池中会有许多的阻塞的线程队列,当某一种事件发生的时候,就会根据事件来使用线程池工厂来为事件创建线程。【下面是源代码】
+
+```java
+public void initThreadPool() throws Exception {
+
+ // 批处理消息
+ BlockingQueue<Runnable> batchMsgThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize);
+ batchMsgExecutor =
ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,
batchMsgThreadPoolQueue, "eventMesh-batchMsg-", true);
+ // 发送消息,和上面的类似
+ BlockingQueue<Runnable> sendMsgThreadPoolQueue=...
+ // 推送消息,和上面的类似
+ BlockingQueue<Runnable> pushMsgThreadPoolQueue =...
+ // 客户端管理,和上面的类似
+ BlockingQueue<Runnable> clientManageThreadPoolQueue =...
+ // 管理线程池,和上面的类似
+ BlockingQueue<Runnable> adminThreadPoolQueue =...
+ // 回复消息,和上面的类似
+ BlockingQueue<Runnable> replyMessageThreadPoolQueue =...
+ }
+```
+
+2.
注册http请求处理器,这个处理器会针对http发过来的请求,获取到请求码,根据请求码注册对应的处理器,并且为这个事件用上面的线程池分配一个线程进行处理。【下面是源代码】
+
+```java
+ public void registerHTTPRequestProcessor() {
+ // 新建批量消息处理器
+ BatchSendMessageProcessor batchSendMessageProcessor = new
BatchSendMessageProcessor(this);
+ // 获取请求码,并且分配一个线程进行处理
+ registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(),
+ batchSendMessageProcessor, batchMsgExecutor);
+
+ BatchSendMessageV2Processor batchSendMessageV2Processor =...
+ // 同步消息处理器,和上面的类似
+ SendSyncMessageProcessor sendSyncMessageProcessor =...
+ // 异步消息处理器,和上面的类似
+ SendAsyncMessageProcessor sendAsyncMessageProcessor =...
+ // 管理指标处理器,和上面的类似
+ AdminMetricsProcessor adminMetricsProcessor =...
+ // 心跳处理器,和上面的类似
+ HeartBeatProcessor heartProcessor =...
+ // 订阅处理器,和上面的类似
+ SubscribeProcessor subscribeProcessor =...
+ // 和上面的类似
+ UnSubscribeProcessor unSubscribeProcessor =...
+ // 回复消息处理器,和上面的类似
+ ReplyMessageProcessor replyMessageProcessor =...
+ }
+```
+
+3. 这里就是http初始化的全部代码。
+
+```java
+public class EventMeshHTTPServer extends AbstractHTTPServer {
+ ...
+ //初始化
+ public void init() throws Exception {
+ logger.info("==================EventMeshHTTPServer
Initialing==================");
+ // 初始化线程组
+ super.init("eventMesh-http");
+ // 初始化线程池
+ initThreadPool();
+ // 对指标初始化,主要是把生成的指标用于后台数据处理
+ metrics = new HTTPMetricsServer(this);
+ metrics.init();
+ // 消费者管理初始化,主要是把httpServer注册到事件总线上
+ consumerManager = new ConsumerManager(this);
+ consumerManager.init();
+
+ producerManager = new ProducerManager(this);
+ producerManager.init();
+ // 重试
+ httpRetryer = new HttpRetryer(this);
+ httpRetryer.init();
+ // 注册http处理器
+ registerHTTPRequestProcessor();
+
+ logger.info("--------------------------EventMeshHTTPServer inited");
+ }
+ ...
+}
+```
+
+
+
+> 初始化完成之后,再启动http的服务器端,这里我也同样聊聊以下几点。
+
+1.
AbstractHTTPServer的启动,采用的是netty的异步模型框架搭建的。具体来讲,这里创建了两个线程池:`bossGroup`和`workerGroup`,前者是用来轮询`accept`事件并且和`client`建立连接的,后者是用来轮询`read`和`write`时间并且使用`handlers`处理`io`事件的。而且这里采用了回调机制,当调用发出后,并不一定立刻就能得到结果,而是在实际处理的时候调用这个组件完成后,通过状态、通知等回调告知调用者。
Review comment:
后者是用来轮询`read`和`write`时间并且使用`handlers`处理`io`事件的, 时间or事件?
##########
File path: docs/cn/blog/Implementation of pub and sub.md
##########
@@ -0,0 +1,344 @@
+# pub和sub的实现-tcp
+
+> Eventmesh-sdk-java作为客户端,与eventmesh-runtime通信,用于完成消息的发送和接收。
+>
+> 那么pub和sub作为客户端,在eventmesh中是怎么实现的?
+
+## 结构
+
+>
我们首先看到sdk-java中tcp这一部分的结构:`common`部分定义了tcp协议下广播消息、异步消息都会用到的公共方法,`impl`是对tcp的pub和sub的真正实现。
+
+
+
+
+
+> 下面来具体介绍一下三个实现类的功能,下面是对应的类图:
+>- DefaultEventMeshClient:实现了EventMeshClient接口,用来定义一个具备pub和sub功能的客户端。
+>- SimplePubClientImpl:实现了SimplePubClient接口,同时还继承了TcpClient ,用来定义一个具备pub功能的客户端。
+>- SimpleSubClientImpl:实现了SimpleSubClient接口,同时也继承了TcpClient ,用来定义一个具备sub功能的客户端。
+
+
+
+
+
+## 代码
+
+-
对于client的实现,首先是要和runtime进行连接,也就是和服务器进行连接,连接的过程是:client发送心跳包给server,server对心跳包进行回应,然后client发送连接请求,接收到server的回应之后,客户端和服务器连接成功。在EventMeshClient接口中,它是这样定义连接的:
+
+```java
+void init() throws Exception;
+void heartbeat() throws Exception;
+```
+
+- 在DefaultEventMeshClient中具体的实现如下:
+
+```java
+public void init() throws Exception {
+ this.subClient.init();
+ this.pubClient.init();
+}
+```
+
+```java
+public void heartbeat() throws Exception {
+ this.pubClient.heartbeat();
+ this.subClient.heartbeat();
+}
+```
+
+-
也就是说,一个客户端和服务器的连接被分成了两个部分,一个是pub客户端的连接,一个是sub客户端的连接。那我们具体来看到两个客户端的实现,因为两个客户端实现有点类似,所以我们这里以`SimpleSubClientImpl`为例进行说明:
+
+ > client和server的握手过程
+ >
+ > HELLO_REQUEST(2), //client发给server的握手请求
+ > HELLO_RESPONSE(3), //server回复client的握手请求
+
+```java
+//SimpleSubClientImpl
+public void init() throws Exception {
+ //指向server的启动入口
+ open(new Handler());
+ //设置了hello的header和body部分,client和server进行握手
+ hello();
+ logger.info("SimpleSubClientImpl|{}|started!", clientNo);
+}
+```
+
+> client和server的发送心跳包过程
+>
+> HEARTBEAT_REQUEST(0), //client发给server的心跳包
+>
+> HEARTBEAT_RESPONSE(1), //server回复client的心跳包
+
+```java
+//SimpleSubClientImpl
+public void heartbeat() throws Exception {
+ task = scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ //如果不是处于活跃状态,就重新连接
+ if (!isActive()) {
+ SimpleSubClientImpl.this.reconnect();
+ }
+ //client向server发送心跳包
+ Package msg = MessageUtils.heartBeat();
+ SimpleSubClientImpl.this.io(msg,
EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+ } catch (Exception ignore) {
+ }
+ }
+ }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT,
TimeUnit.MILLISECONDS);
+}
+```
+
+-
连接成功之后,pub可以实现发布消息,并且可以选择使用异步还是广播的方式进行发布;而sub实现对消息的订阅,并且时刻监听pub发布的消息,对pub发布的消息进行回应。在SimplePubClient和SimpleSubClient的接口中,它们是这样定义的:
+
+```java
+//SimplePubClient
+//对消息进行发布
+Package publish(Package msg, long timeout) throws Exception;
+//发送rr消息
+Package rr(Package msg, long timeout) throws Exception;
+//发送异步单播消息
+void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws
Exception;
+//广播消息
+void broadcast(Package msg, long timeout) throws Exception;
+```
+
+```java
+//SimpleSubClient
+//监听
+void listen() throws Exception;
+//订阅管理
+void subscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception;
+//取消订阅
+void unsubscribe() throws Exception;
+```
+
+- 下面介绍它们的具体实现
+
+```java
+//SimplePubClientImpl
+public class SimplePubClientImpl extends TcpClient implements SimplePubClient {
+ ...
+ /**
+ * 发送事件消息, 返回值是EventMesh 给了ACK
+ *
+ * @param msg
+ * @throws Exception
+ */
+ public Package publish(Package msg, long timeout) throws Exception {
+ logger.info("SimplePubClientImpl|{}|publish|send|type={}|msg={}",
clientNo, msg.getHeader().getCommand(), msg);
+ //请求上下文,发送事件消息
+ return io(msg, timeout);
+ }
+
+ /**
+ * 发送RR消息
+ *
+ * @param msg
+ * @param timeout
+ * @return
+ * @throws Exception
+ */
+ public Package rr(Package msg, long timeout) throws Exception {
+ logger.info("SimplePubClientImpl|{}|rr|send|type={}|msg={}", clientNo,
msg.getHeader().getCommand(), msg);
+ return io(msg, timeout);
+ }
+
+ /**
+ * 异步RR
+ *
+ * @param msg
+ * @param callback
+ * @param timeout
+ * @throws Exception
+ */
+ @Override
+ public void asyncRR(Package msg, AsyncRRCallback callback, long timeout)
throws Exception {
+ super.send(msg);
+ this.callbackConcurrentHashMap.put((String) RequestContext._key(msg),
callback);
+
+ }
+
+ /**
+ * 发送广播消息
+ *
+ * @param msg
+ * @param timeout
+ * @throws Exception
+ */
+ public void broadcast(Package msg, long timeout) throws Exception {
+ logger.info("SimplePubClientImpl|{}|broadcast|send|type={}|msg={}",
clientNo, msg.getHeader().getCommand(), msg);
+ super.send(msg);
+ }
+
+ ...
+}
+
+```
+
+```java
+public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
+ ...
+ //监听
+ public void listen() throws Exception {
+ //调用MessageUtils方法启动监听
+ Package request = MessageUtils.listen();
+ this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+ }
+
+ //订阅
+ public void subscribe(String topic, SubscriptionMode subscriptionMode,
SubcriptionType subcriptionType) throws Exception {
+ subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode,
subcriptionType));
+ Package request = MessageUtils.subscribe(topic, subscriptionMode,
subcriptionType);
+ this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+ }
+ //取消订阅
+ public void unsubscribe() throws Exception {
+ Package request = MessageUtils.unsubscribe();
+ this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+ }
+
+ ...
+}
+
+```
+
+- 最后就是断连,具体过程就是
+
+ > CLIENT_GOODBYE_REQUEST(4), //client主动断连时通知server
CLIENT_GOODBYE_RESPONSE(5), //server回复client的主动断连通知
+ > SERVER_GOODBYE_REQUEST(6), //server主动断连时通知client
+ > SERVER_GOODBYE_RESPONSE(7), //client回复server的主动断连通知
+
+- 在EventMeshClient接口中是这样定义的:
+
+```java
+void close();
+```
+
+- 在DefaultEventMeshClient中的具体实现:
+
+```java
+ public void close() {
+ this.pubClient.close();
+ this.subClient.close();
+}
+```
+
+```java
+//SimplePubClientImpl
+private void goodbye() throws Exception {
+ Package msg = MessageUtils.goodbye();
+ this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+}
+```
+
+```java
+//SimpleSubClientImpl
+private void goodbye() throws Exception {
+ Package msg = MessageUtils.goodbye();
+ this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+}
+```
+
+## 举例
+
+>
在eventmesh中有一个test模块,就是包含了tcp和http两种协议的演示,这些演示,就是对sdk-java作为客户端和runtime的server交互的一些举例,下面我们以tcp的单播消息来对上面的内容做一个举例说明
Review comment:
test改为examples
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]