This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b514353fb2 [IOTDB-3251] Add ForwardTrigger doc and change api package
(#6080)
b514353fb2 is described below
commit b514353fb2dae92dcaa4756e93f82570a2ee8d03
Author: 刘威 <[email protected]>
AuthorDate: Wed Jun 1 09:58:49 2022 +0800
[IOTDB-3251] Add ForwardTrigger doc and change api package (#6080)
---
docs/UserGuide/Process-Data/Triggers.md | 108 +++++++++++++++++++-
docs/zh/UserGuide/Process-Data/Triggers.md | 111 ++++++++++++++++++++-
.../db/engine/trigger/builtin/ForwardTrigger.java | 12 +--
.../http/HTTPForwardConfiguration.java | 2 +-
.../sink/{ => forward}/http/HTTPForwardEvent.java | 2 +-
.../{ => forward}/http/HTTPForwardHandler.java | 2 +-
.../mqtt/MQTTForwardConfiguration.java | 2 +-
.../sink/{ => forward}/mqtt/MQTTForwardEvent.java | 2 +-
.../{ => forward}/mqtt/MQTTForwardHandler.java | 2 +-
9 files changed, 229 insertions(+), 14 deletions(-)
diff --git a/docs/UserGuide/Process-Data/Triggers.md
b/docs/UserGuide/Process-Data/Triggers.md
index d1d47f6914..8f185e2aec 100644
--- a/docs/UserGuide/Process-Data/Triggers.md
+++ b/docs/UserGuide/Process-Data/Triggers.md
@@ -307,7 +307,7 @@ When a user manages triggers, 4 types of authorities will
be involved:
* `START_TRIGGER`: Only users with this authority are allowed to start
triggers. This authority is path dependent.
* `STOP_TRIGGER`: Only users with this authority are allowed to stop triggers.
This authority is path dependent.
-For more information, refer to [Authority Management
Statement](../Operation%20Manual/Administration.md).
+For more information, refer to [Authority Management
Statement](../Administration-Management/Administration.md).
@@ -679,7 +679,113 @@ annotations.put("description", "{{.alertname}}:
{{.series}} is {{.value}}");
alertManagerHandler.onEvent(new AlertManagerEvent(alertName, extraLabels,
annotations));
```
+#### ForwardSink
+Trigger can use ForwardSink to forward written data through HTTP and MQTT,
which has contains HTTPForwardHandler and MQTTForwardHandler. To improve
connection efficiency, all HTTPForwardHandlers share a connection pool, while
MQTTForwardHandlers with the same host, port and username parameters share a
connection pool.
+
+The difference between MQTTForwardHandler and MQTTHandler is that the former
uses connection pool while the latter does not, and the message format is also
different.
+
+See [ForwardTrigger](#ForwardTrigger) as example.
+
+## ForwardTrigger
+ForwardTrigger is a built-in trigger for data distribution/forwarding. It uses
ForwardSink and consumption queue to realize asynchronous batch processing of
trigger events. Asynchronous forwarding can avoid the system blocking caused by
forwarding blocking. The connection pool in ForwardSink can make the
connections in the pool reuse efficiently and safely, and avoid the overhead of
frequent connection establishment and closing.
+
+<img
src="https://github.com/apache/iotdb-bin-resources/blob/main/docs/UserGuide/Process-Data/Triggers/ForwardQueueConsume.png?raw=true"
alt="Forward Queue Consume">
+
+### Trigger process
+1. Trigger event come.
+2. ForwardTrigger put the event into queue pool.
+3. Finish Trigger event.
+
+### Queue pool consumption process
+1. Put trigger event into queue grouping by device (polling if there is no
device).
+2. Each queue consumer thread monitors the current queue. If it times out or
reaches the maximum forwarding batch, it calls the handler for batch forwarding.
+3. Handler batch serializes trigger events. After message encapsulation, call
the built-in connection pool to complete forwarding.
+
+### Message format
+At present, the message format only supports JSON format of fixed template.
The template is as follows:
+```
+[{"device":"%s","measurement":"%s","timestamp":%d,"value":%s}]
+```
+
+### Example
+#### Create ForwardTrigger
+Create a forward_http trigger with HTTP protocol and a forward_mqtt trigger
with mqtt protocol, which subscribes to the prefix path `root.http` and
`root.mqtt` respectively.
+```sql
+CREATE trigger forward_http AFTER INSERT ON root.http
+AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger'
+WITH ('protocol' = 'http', 'endpoint' =
'http://127.0.0.1:8080/forward_receive')
+
+CREATE trigger forward_mqtt AFTER INSERT ON root.mqtt
+AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger'
+WITH ('protocol' = 'mqtt', 'host' = '127.0.0.1', 'port' = '1883', 'username' =
'root', 'password' = 'root', 'topic' = 'mqtt-test')
+```
+
+#### Fire trigger
+Insert data into the sub-path of the two prefix paths to fire the trigger.
+```sql
+INSERT INTO root.http.d1(timestamp, s1) VALUES (1, 1);
+INSERT INTO root.http.d1(timestamp, s2) VALUES (2, true);
+INSERT INTO root.mqtt.d1(timestamp, s1) VALUES (1, 1);
+INSERT INTO root.mqtt.d1(timestamp, s2) VALUES (2, true);
+```
+
+#### Receive forwarded message
+After the trigger is fired, JSON data in follow format will be received at the
HTTP receiving end:
+```json
+[
+ {
+ "device":"root.http.d1",
+ "measurement":"s1",
+ "timestamp":1,
+ "value":1.0
+ },
+ {
+ "device":"root.http.d1",
+ "measurement":"s2",
+ "timestamp":2,
+ "value":true
+ }
+]
+```
+
+After the trigger is fired, JSON data in follow format will be received at the
MQTT receiving end:
+```json
+[
+ {
+ "device":"root.mqtt.d1",
+ "measurement":"s1",
+ "timestamp":1,
+ "value":1.0
+ },
+ {
+ "device":"root.mqtt.d1",
+ "measurement":"s2",
+ "timestamp":2,
+ "value":true
+ }
+]
+```
+
+### Config Parameter of ForwardTrigger
+| Parameter | Required | Default | Max | Description
|
+|--------------------|----------|--------------|------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| protocol | true | http | | Forward protocol, such
as HTTP/MQTT
|
+| queueNumber | | 8 | 8 | The number of queue,
comparing to the global parameter trigger_forward_max_queue_number and take the
smaller
|
+| queueSize | | 2000 | 2000 | The size of queue,
comparing to the global parameter trigger_forward_max_size_per_queue and take
the smaller
|
+| batchSize | | 50 | 50 | The size of each
forwarding batch, comparing to the global parameter trigger_forward_batch_size
and take the smaller
|
+| stopIfException | | false | | Stop forwarding if
exception occurs
|
+| endpoint | true | | | Request endpoint
address (HTTP protocol parameter)<br/>Note: HTTP connection pool parameters
depend on global
parameters:<br/>trigger_forward_http_pool_size=200<br/>and<br/>trigger_forward_http_pool_max_per_route=20
|
+| host | true | | | MQTT broker host (MQTT
protocol parameter)
|
+| port | true | | | MQTT broker port (MQTT
protocol parameter)
|
+| username | true | | | Username (MQTT
protocol parameter)
|
+| password | true | | | Password (MQTT
protocol parameter)
|
+| topic | true | | | The topic of MQTT
message (MQTT protocol parameter)
|
+| reconnectDelay | | 10ms | | Reconnection waiting
time (MQTT protocol parameter)
|
+| connectAttemptsMax | | 3 | | Max connection
attempts (MQTT protocol parameter)
|
+| qos | | exactly_once | | Quality of Service
(MQTT protocol parameter), must be exactly_once, at_least_once or at_most_once
|
+| poolSize | | 4 | 4 | MQTT Connection Pool
Size (MQTT protocol parameter), comparing to the global parameter
trigger_forward_mqtt_pool_size and take the smaller
|
+| retain | | false | | Let MQTT Broker retain
the message after publishing (MQTT protocol parameter)
|
## Maven Project Example
diff --git a/docs/zh/UserGuide/Process-Data/Triggers.md
b/docs/zh/UserGuide/Process-Data/Triggers.md
index 94a9dc6029..18afebf9fc 100644
--- a/docs/zh/UserGuide/Process-Data/Triggers.md
+++ b/docs/zh/UserGuide/Process-Data/Triggers.md
@@ -277,7 +277,7 @@ SHOW TRIGGERS
* `START_TRIGGER`:具备该权限的用户才被允许启动已被停止的触发器。该权限需要与触发器的路径绑定。
* `STOP_TRIGGER`:具备该权限的用户才被允许停止正在运行的触发器。该权限需要与触发器的路径绑定。
-更多用户权限相关的内容,请参考 [权限管理语句](../Operation%20Manual/Administration.md)。
+更多用户权限相关的内容,请参考 [权限管理语句](../Administration-Management/Administration.md)。
## 实用工具类
@@ -617,6 +617,115 @@ annotations.put("description", "{{.alertname}}:
{{.series}} is {{.value}}");
alertManagerHandler.onEvent(new AlertManagerEvent(alertName, extraLabels,
annotations));
```
+#### ForwardSink
+
+触发器可以使用ForwardSink通过HTTP和MQTT协议转发写入的数据,其内置了HTTPForwardHandler和MQTTForwardHandler。为提高连接使用效率,所有HTTPForwardHandler共用一个连接池,而host,port和username参数相同的MQTTForwardHandler共用一个连接池。
+
+MQTTForwardHandler与MQTTHandler的区别在于,前者使用连接池而后者没有使用连接池,并且消息的格式也不同。
+
+使用示例见[ForwardTrigger](#ForwardTrigger)。
+
+## ForwardTrigger
+
+ForwardTrigger是内置的用于实现数据分发的触发器,它使用ForwardSink和消费队列实现了对触发事件的异步批量处理。采用异步的方式进行转发,可以避免因为转发阻塞导致的系统阻塞。而采用ForwardSink中的连接池可使得池中的连接可以得到高效、安全的复用,避免了连接频繁建立、关闭的开销。
+
+<img
src="https://github.com/apache/iotdb-bin-resources/blob/main/docs/UserGuide/Process-Data/Triggers/ForwardQueueConsume.png?raw=true"
alt="Forward Queue Consume">
+
+### 触发流程
+1. 触发事件到来。
+2. ForwardTrigger将触发事件放入队列池。
+3. 完成触发事件。
+
+### 队列池消费流程
+1. 将触发事件按照Device入队(如没有Device,则轮询)。
+2. 每个队列消费者线程监控当前队列,若超时或达到最大转发批量则调用Handler批量转发。
+3. Handler批量序列化触发事件,消息封装完成后调用内置的连接池完成转发。
+
+### 消息格式
+目前消息格式仅支持固定模板的JSON格式,模板如下:
+```
+[{"device":"%s","measurement":"%s","timestamp":%d,"value":%s}]
+```
+
+### 使用示例
+#### 创建ForwardTrigger
+创建一个使用HTTP协议的forward_http触发器和一个使用MQTT协议的forward_mqtt触发器,两者分别订阅前缀路径`root.http`和`root.mqtt`。
+```sql
+CREATE trigger forward_http AFTER INSERT ON root.http
+AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger'
+WITH ('protocol' = 'http', 'endpoint' =
'http://127.0.0.1:8080/forward_receive')
+
+CREATE trigger forward_mqtt AFTER INSERT ON root.mqtt
+AS 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger'
+WITH ('protocol' = 'mqtt', 'host' = '127.0.0.1', 'port' = '1883', 'username' =
'root', 'password' = 'root', 'topic' = 'mqtt-test')
+```
+
+#### 激发触发器
+向两个前缀路径的子路径插入数据,激发触发器。
+```sql
+INSERT INTO root.http.d1(timestamp, s1) VALUES (1, 1);
+INSERT INTO root.http.d1(timestamp, s2) VALUES (2, true);
+INSERT INTO root.mqtt.d1(timestamp, s1) VALUES (1, 1);
+INSERT INTO root.mqtt.d1(timestamp, s2) VALUES (2, true);
+```
+
+#### 接收转发的消息
+触发器激发后,在HTTP接收端会接收到如下格式的JSON数据:
+```json
+[
+ {
+ "device":"root.http.d1",
+ "measurement":"s1",
+ "timestamp":1,
+ "value":1.0
+ },
+ {
+ "device":"root.http.d1",
+ "measurement":"s2",
+ "timestamp":2,
+ "value":true
+ }
+]
+```
+
+触发器触发后,在MQTT接收端会接收到如下格式的JSON数据:
+```json
+[
+ {
+ "device":"root.mqtt.d1",
+ "measurement":"s1",
+ "timestamp":1,
+ "value":1.0
+ },
+ {
+ "device":"root.mqtt.d1",
+ "measurement":"s2",
+ "timestamp":2,
+ "value":true
+ }
+]
+```
+
+### ForwardTrigger的配置参数
+| 参数 | 必填 | 默认值 | 上限 | 描述
|
+|--------------------|------| ------------ | ----
|--------------------------------------------------------------------------------------------------------------------------------------|
+| protocol | true | http | | 转发协议,如HTTP/MQTT
|
+| queueNumber | | 8 | 8 |
队列数量,与全局参数trigger_forward_max_queue_number比较取小
|
+| queueSize | | 2000 | 2000 |
队列大小,与全局参数trigger_forward_max_size_per_queue比较取小
|
+| batchSize | | 50 | 50 |
每次最大转发批量,与全局参数trigger_forward_batch_size比较取小
|
+| stopIfException | | false | | 出现异常是否终止
|
+| endpoint | true | | |
请求端点地址(HTTP协议参数)<br/>说明:HTTP连接池参数取决于全局参数<br/>trigger_forward_http_pool_size=200<br/>和<br/>trigger_forward_http_pool_max_per_route=20
|
+| host | true | | | MQTT Broker主机名(MQTT 协议参数)
|
+| port | true | | | MQTT Broker端口号(MQTT 协议参数)
|
+| username | true | | | 用户名(MQTT 协议参数)
|
+| password | true | | | 密码(MQTT 协议参数)
|
+| topic | true | | | MQTT消息的主题(MQTT 协议参数)
|
+| reconnectDelay | | 10ms | | 重连等待时间(MQTT 协议参数)
|
+| connectAttemptsMax | | 3 | | 最大尝试连接次数(MQTT 协议参数)
|
+| qos | | exactly_once | | 服务质量保证(MQTT
协议参数),可选exactly_once,at_least_once,at_most_once
|
+| poolSize | | 4 | 4 | MQTT连接池大小(MQTT
协议参数),与全局参数trigger_forward_mqtt_pool_size比较取小
|
+| retain | | false | | Publish后是否让MQTT
Broker保持消息(MQTT 协议参数)
|
+
## 完整的 Maven 示例项目
如果您使用 [Maven](http://search.maven.org/),可以参考我们编写的示例项目 **trigger-example**。
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/builtin/ForwardTrigger.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/builtin/ForwardTrigger.java
index 0a22cf4d7e..7aa7523e3e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/builtin/ForwardTrigger.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/builtin/ForwardTrigger.java
@@ -26,12 +26,12 @@ import
org.apache.iotdb.db.engine.trigger.sink.api.Configuration;
import org.apache.iotdb.db.engine.trigger.sink.api.Event;
import org.apache.iotdb.db.engine.trigger.sink.api.Handler;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
-import org.apache.iotdb.db.engine.trigger.sink.http.HTTPForwardConfiguration;
-import org.apache.iotdb.db.engine.trigger.sink.http.HTTPForwardEvent;
-import org.apache.iotdb.db.engine.trigger.sink.http.HTTPForwardHandler;
-import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTForwardConfiguration;
-import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTForwardEvent;
-import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTForwardHandler;
+import
org.apache.iotdb.db.engine.trigger.sink.forward.http.HTTPForwardConfiguration;
+import org.apache.iotdb.db.engine.trigger.sink.forward.http.HTTPForwardEvent;
+import org.apache.iotdb.db.engine.trigger.sink.forward.http.HTTPForwardHandler;
+import
org.apache.iotdb.db.engine.trigger.sink.forward.mqtt.MQTTForwardConfiguration;
+import org.apache.iotdb.db.engine.trigger.sink.forward.mqtt.MQTTForwardEvent;
+import org.apache.iotdb.db.engine.trigger.sink.forward.mqtt.MQTTForwardHandler;
import org.apache.iotdb.db.engine.trigger.utils.BatchHandlerQueue;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.tsfile.utils.Binary;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardConfiguration.java
similarity index 96%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardConfiguration.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardConfiguration.java
index 530b784615..52c6ea21bf 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardConfiguration.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardConfiguration.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.sink.http;
+package org.apache.iotdb.db.engine.trigger.sink.forward.http;
import org.apache.iotdb.db.engine.trigger.sink.api.Configuration;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardEvent.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardEvent.java
similarity index 94%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardEvent.java
index bfb407a566..d83785454a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardEvent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.sink.http;
+package org.apache.iotdb.db.engine.trigger.sink.forward.http;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.trigger.sink.forward.ForwardEvent;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardHandler.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardHandler.java
similarity index 98%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardHandler.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardHandler.java
index 1c091653ad..1efcb2eab6 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/http/HTTPForwardHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/http/HTTPForwardHandler.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.sink.http;
+package org.apache.iotdb.db.engine.trigger.sink.forward.http;
import org.apache.iotdb.db.engine.trigger.sink.api.Handler;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardConfiguration.java
similarity index 98%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardConfiguration.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardConfiguration.java
index 820ac925fe..b538aff003 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardConfiguration.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardConfiguration.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.sink.mqtt;
+package org.apache.iotdb.db.engine.trigger.sink.forward.mqtt;
import org.apache.iotdb.db.engine.trigger.sink.api.Configuration;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardEvent.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardEvent.java
similarity index 94%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardEvent.java
index bd99824ab1..b74cd33e95 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardEvent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.sink.mqtt;
+package org.apache.iotdb.db.engine.trigger.sink.forward.mqtt;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.trigger.sink.forward.ForwardEvent;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardHandler.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardHandler.java
similarity index 98%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardHandler.java
rename to
server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardHandler.java
index 175a9be5b9..2637c5e3c5 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTForwardHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardHandler.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.sink.mqtt;
+package org.apache.iotdb.db.engine.trigger.sink.forward.mqtt;
import org.apache.iotdb.db.engine.trigger.sink.api.Handler;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;