This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-doc in repository https://gitbox.apache.org/repos/asf/iotdb-docs.git
commit 85b9e7a5d13036ba551ac75ca5cede8f94fcc97d Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Aug 22 10:23:41 2023 +0800 pipe: update Chinese user docs --- .../V1.2.x/User-Manual/IoTDB-Data-Pipe_timecho.md | 212 +++++++++------------ 1 file changed, 91 insertions(+), 121 deletions(-) diff --git a/src/zh/UserGuide/V1.2.x/User-Manual/IoTDB-Data-Pipe_timecho.md b/src/zh/UserGuide/V1.2.x/User-Manual/IoTDB-Data-Pipe_timecho.md index 9318808..fe70bfc 100644 --- a/src/zh/UserGuide/V1.2.x/User-Manual/IoTDB-Data-Pipe_timecho.md +++ b/src/zh/UserGuide/V1.2.x/User-Manual/IoTDB-Data-Pipe_timecho.md @@ -29,7 +29,7 @@ - 处理(Process) - 发送(Connect) -**Pipe 允许用户自定义三个子任务的处理逻辑,通过类似 UDF 的方式处理数据。**在一个 Pipe 中,上述的子任务分别由三种插件执行实现,数据会依次经过这三个插件进行处理:Pipe Extractor 用于抽取数据,Pipe Processor 用于处理数据,Pipe Connector 用于发送数据,最终数据将被发至外部系统。 +**Pipe 允许用户自定义三个子任务的处理逻辑,通过类似 UDF 的方式处理数据。** 在一个 Pipe 中,上述的子任务分别由三种插件执行实现,数据会依次经过这三个插件进行处理:Pipe Extractor 用于抽取数据,Pipe Processor 用于处理数据,Pipe Connector 用于发送数据,最终数据将被发至外部系统。 **Pipe 任务的模型如下:** @@ -78,7 +78,9 @@ > ❗️**注:目前的 IoTDB -> IoTDB 的数据订阅实现并不支持 DDL 同步** > -> 即:不支持 ttl,trigger,别名,模板,视图,创建/删除序列,创建/删除存储组等操作**IoTDB -> IoTDB 的数据订阅要求目标端 IoTDB:** +> 即:不支持 ttl,trigger,别名,模板,视图,创建/删除序列,创建/删除存储组等操作 +> +> **IoTDB -> IoTDB 的数据订阅要求目标端 IoTDB:** > > * 开启自动创建元数据:需要人工配置数据类型的编码和压缩与发送端保持一致 > * 不开启自动创建元数据:手工创建与源端一致的元数据 @@ -171,7 +173,6 @@ WITH CONNECTOR ( 'connector' = 'iotdb-thrift-connector', 'connector.thrift.port' = '9999', 'connector.thrift.host' = 'localhost', - 'connector.id' = '1', ) ``` @@ -247,12 +248,12 @@ WHERE CONNECTOR USED BY <PipeId> 一个数据订阅 pipe 在其被管理的生命周期中会经过多种状态: -- **STOPPED:**pipe 处于停止运行状态。当管道处于该状态时,有如下几种可能: +- **STOPPED:** pipe 处于停止运行状态。当管道处于该状态时,有如下几种可能: - 当一个 pipe 被成功创建之后,其初始状态为暂停状态 - 用户手动将一个处于正常运行状态的 pipe 暂停,其状态会被动从 RUNNING 变为 STOPPED - 当一个 pipe 运行过程中出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED -- **RUNNING:**pipe 正在正常工作 -- **DROPPED:**pipe 任务被永久删除 +- **RUNNING:** pipe 正在正常工作 +- **DROPPED:** pipe 任务被永久删除 下图表明了所有状态以及状态的迁移: @@ -307,8 +308,8 @@ WHERE CONNECTOR USED BY <PipeId> > ✅ **一条数据从生产到落库 IoTDB,包含两个关键的时间概念** > -> * **event time:**数据实际生产时的时间(或者数据生产系统给数据赋予的生成时间,是数据点中的时间项),也称为事件时间。 -> * **arrival time:**数据到达 IoTDB 系统内的时间。 +> * **event time:** 数据实际生产时的时间(或者数据生产系统给数据赋予的生成时间,是数据点中的时间项),也称为事件时间。 +> * **arrival time:** 数据到达 IoTDB 系统内的时间。 > > 我们常说的乱序数据,指的是数据到达时,其 **event time** 远落后于当前系统时间(或者已经落库的最大 **event > time**)的数据。另一方面,不论是乱序数据还是顺序数据,只要它们是新到达系统的,那它们的 **arrival time** 都是会随着数据到达 > IoTDB 的顺序递增的。 @@ -348,36 +349,44 @@ WHERE CONNECTOR USED BY <PipeId> ### 预置 connector -#### iotdb-thrift-connector-v1(别名:iotdb-thrift-connector) +#### iotdb-thrift-sync-connector(别名:iotdb-thrift-connector) -作用:主要用于 IoTDB(v1.2.0+)与 IoTDB(v1.2.0+)之间的数据传输。使用 Thrift RPC 框架传输数据,单线程 blocking IO 模型。保证接收端 apply 数据的顺序与发送端接受写入请求的顺序一致。 +作用:主要用于 IoTDB(v1.2.0+)与 IoTDB(v1.2.0+)之间的数据传输。 +使用 Thrift RPC 框架传输数据,单线程 blocking IO 模型。 +保证接收端 apply 数据的顺序与发送端接受写入请求的顺序一致。 限制:源端 IoTDB 与 目标端 IoTDB 版本都需要在 v1.2.0+。 -| key | value | value 取值范围 | required or optional with default | -| -------------- | --------------------------------------------------- | ----------------------------------------------------------- | --------------------------------- | -| connector | iotdb-thrift-connector 或 iotdb-thrift-connector-v1 | String: iotdb-thrift-connector 或 iotdb-thrift-connector-v1 | required | -| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | required | -| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | required | +| key | value | value 取值范围 | required or optional with default | +| -------------- | --------------------------------------------------- |---------------------------------------------------------------------------|------------------------------------------| +| connector | iotdb-thrift-connector 或 iotdb-thrift-sync-connector | String: iotdb-thrift-connector 或 iotdb-thrift-sync-connector | required | +| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | optional: 与 connector.node-urls 任选其一填写 | +| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | optional: 与 connector.node-urls 任选其一填写 | +| connector.node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url | String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | optional: 与 connector.ip:connector.port 任选其一填写 | > 📌 请确保接收端已经创建了发送端的所有时间序列,或是开启了自动创建元数据,否则将会导致 pipe 运行失败。 -#### iotdb-thrift-connector-v2 +#### iotdb-thrift-async-connector -作用:主要用于 IoTDB(v1.2.0+)与 IoTDB(v1.2.0+)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景。不保证接收端 apply 数据的顺序与发送端接受写入请求的顺序一致,但是保证数据发送的完整性(at-least-once)。 +作用:主要用于 IoTDB(v1.2.0+)与 IoTDB(v1.2.0+)之间的数据传输。 +使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景。 +不保证接收端 apply 数据的顺序与发送端接受写入请求的顺序一致,但是保证数据发送的完整性(at-least-once)。 限制:源端 IoTDB 与 目标端 IoTDB 版本都需要在 v1.2.0+。 -| key | value | value 取值范围 | required or optional with default | -| ------------------- | ------------------------------------------------------- | ------------------------------------------------------------ | --------------------------------- | -| connector | iotdb-thrift-connector-v2 | String: iotdb-thrift-connector-v2 | required | -| connector.node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url | String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669''127.0.0.1:6667' | required | +| key | value | value 取值范围 | required or optional with default | +| -------------- |-------------------------------------------------------|---------------------------------------------------------------------------|------------------------------------------| +| connector | iotdb-thrift-async-connector | String: iotdb-thrift-async-connector | required | +| connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | optional: 与 connector.node-urls 任选其一填写 | +| connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | optional: 与 connector.node-urls 任选其一填写 | +| connector.node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url | String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | optional: 与 connector.ip:connector.port 任选其一填写 | > 📌 请确保接收端已经创建了发送端的所有时间序列,或是开启了自动创建元数据,否则将会导致 pipe 运行失败。 -#### iotdb-sync-connector +#### iotdb-legacy-pipe-connector -作用:主要用于 IoTDB(v1.2.0+)向更低版本的 IoTDB 传输数据,使用 v1.2.0 版本前的数据同步(Sync)协议。使用 Thrift RPC 框架传输数据。单线程 sync blocking IO 模型,传输性能较弱。 +作用:主要用于 IoTDB(v1.2.0+)向更低版本的 IoTDB 传输数据,使用 v1.2.0 版本前的数据同步(Sync)协议。 +使用 Thrift RPC 框架传输数据。单线程 sync blocking IO 模型,传输性能较弱。 限制:源端 IoTDB 版本需要在 v1.2.0+,目标端 IoTDB 版本可以是 v1.2.0+、v1.1.x(更低版本的 IoTDB 理论上也支持,但是未经测试)。 @@ -385,7 +394,7 @@ WHERE CONNECTOR USED BY <PipeId> | key | value | value 取值范围 | required or optional with default | | ------------------ | ------------------------------------------------------------ | ---------------------------- | --------------------------------- | -| connector | iotdb-sync-connector | String: iotdb-sync-connector | required | +| connector | iotdb-legacy-pipe-connector | String: iotdb-legacy-pipe-connector | required | | connector.ip | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip | String | required | | connector.port | 目标端 IoTDB 其中一个 DataNode 节点的数据服务 port | Integer | required | | connector.user | 目标端 IoTDB 的用户名,注意该用户需要支持数据写入、TsFile Load 的权限 | String | optional: root | @@ -442,16 +451,16 @@ public interface TabletInsertionEvent extends Event { /** * The consumer processes the data row by row and collects the results by RowCollector. * - * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent contains the results - * collected by the RowCollector + * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the + * results collected by the RowCollector */ Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer); /** * The consumer processes the Tablet directly and collects the results by RowCollector. * - * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent contains the results - * collected by the RowCollector + * @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the + * results collected by the RowCollector */ Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer); } @@ -469,7 +478,7 @@ IoTDB 的存储引擎是 LSM 结构的。数据写入时会先将写入操作落 (1)历史数据抽取:一个同步任务开始前,所有已经落盘的写入数据都会以 TsFile 的形式存在。一个同步任务开始后,采集历史数据时,历史数据将以 TsFileInsertionEvent 作为抽象; -1. (2)实时数据抽取:一个同步任务进行时,当数据流中实时处理操作日志写入事件的速度慢于写入请求速度一定进度之后,未来得及处理的操作日志写入事件会被被持久化至磁盘,以 TsFile 的形式存在,这一些数据被同步引擎采集到后,会以 TsFileInsertionEvent 作为抽象。 +(2)实时数据抽取:一个同步任务进行时,当数据流中实时处理操作日志写入事件的速度慢于写入请求速度一定进度之后,未来得及处理的操作日志写入事件会被被持久化至磁盘,以 TsFile 的形式存在,这一些数据被同步引擎采集到后,会以 TsFileInsertionEvent 作为抽象。 ```java /** @@ -481,7 +490,7 @@ public interface TsFileInsertionEvent extends Event { /** * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents. * - * @return the list of TsFileInsertionEvent + * @return {@code Iterable<TabletInsertionEvent>} the list of TabletInsertionEvent */ Iterable<TabletInsertionEvent> toTabletInsertionEvents(); } @@ -548,7 +557,7 @@ public interface PipeExtractor extends PipePlugin { * @throws Exception the user can throw errors if necessary */ void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) - throws Exception; + throws Exception; /** * Start the extractor. After this method is called, events should be ready to be supplied by @@ -633,7 +642,7 @@ public interface PipeProcessor extends PipePlugin { * @throws Exception the user can throw errors if necessary */ void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) - throws Exception; + throws Exception; /** * This method is called to process the TabletInsertionEvent. @@ -643,7 +652,7 @@ public interface PipeProcessor extends PipePlugin { * @throws Exception the user can throw errors if necessary */ void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) - throws Exception; + throws Exception; /** * This method is called to process the TsFileInsertionEvent. @@ -652,8 +661,13 @@ public interface PipeProcessor extends PipePlugin { * @param eventCollector used to collect result events after processing * @throws Exception the user can throw errors if necessary */ - void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) - throws Exception; + default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws Exception { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); + } + } /** * This method is called to process the Event. @@ -736,7 +750,7 @@ public interface PipeConnector extends PipePlugin { * @throws Exception the user can throw errors if necessary */ void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) - throws Exception; + throws Exception; /** * This method is used to create a connection with sink. This method will be called after the @@ -771,7 +785,12 @@ public interface PipeConnector extends PipePlugin { * @throws PipeConnectionException if the connection is broken * @throws Exception the user can throw errors if necessary */ - void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception; + default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(tabletInsertionEvent); + } + } /** * This method is used to transfer the Event. @@ -844,6 +863,41 @@ SHOW PIPEPLUGINS | DROP_PIPEPLUGIN | 开启流水线插件。路径无关。 | | SHOW_PIPEPLUGINS | 查询流水线插件。路径无关。 | +## 配置参数 + +在 iotdb-common.properties 中: + +```Properties +#################### +### Pipe Configuration +#################### + +# Uncomment the following field to configure the pipe lib directory. +# For Windows platform +# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is +# absolute. Otherwise, it is relative. +# pipe_lib_dir=ext\\pipe +# For Linux platform +# If its prefix is "/", then the path is absolute. Otherwise, it is relative. +# pipe_lib_dir=ext/pipe + +# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. +# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). +# pipe_subtask_executor_max_thread_num=5 + +# The connection timeout (in milliseconds) for the thrift client. +# pipe_connector_timeout_ms=900000 + +# The maximum number of selectors that can be used in the async connector. +# pipe_async_connector_selector_number=1 + +# The core number of clients that can be used in the async connector. +# pipe_async_connector_core_client_number=8 + +# The maximum number of clients that can be used in the async connector. +# pipe_async_connector_max_client_number=16 +``` + ## 功能特性 ### 最少一次语义保证 **at-least-once** @@ -877,87 +931,3 @@ SHOW PIPEPLUGINS - 当发送端集群某数据节点宕机时,数据订阅框架可以利用一致性快照以及保存在副本上的数据快速恢复同步,以此实现数据订阅服务的高可用。 - 当发送端集群整体宕机并重启时,数据订阅框架也能使用快照恢复同步服务。 - -## 配置参数 - -在 iotdb-common.properties 中: - -```Properties -#################### -### Pipe Configuration -#################### - -# Uncomment the following field to configure the pipe lib directory. -# For Window platform -# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is -# absolute. Otherwise, it is relative. -# pipe_lib_dir=ext\\pipe -# For Linux platform -# If its prefix is "/", then the path is absolute. Otherwise, it is relative. -pipe_lib_dir=ext/pipe - -# The name of the directory that stores the tsfiles temporarily hold or generated by the pipe module. -# The directory is located in the data directory of IoTDB. -pipe_hardlink_tsfile_dir_name=pipe - -# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. -pipe_subtask_executor_max_thread_num=5 - -# The number of events that need to be consumed before a checkpoint is triggered. -pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count=10000 - -# The time duration (in milliseconds) between checkpoints. -pipe_subtask_executor_basic_check_point_interval_by_time_duration=10000 - -# The maximum blocking time (in milliseconds) for the pending queue. -pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000 - -# The default size of ring buffer in the realtime extractor's disruptor queue. -pipe_extractor_assigner_disruptor_ring_buffer_size=65536 - -# The maximum number of entries the deviceToExtractorsCache can hold. -pipe_extractor_matcher_cache_size=1024 - -# The capacity for the number of tablet events that can be stored in the pending queue of the hybrid realtime extractor. -pipe_extractor_pending_queue_capacity=128 - -# The limit for the number of tablet events that can be held in the pending queue of the hybrid realtime extractor. -# Noted that: this should be less than or equals to realtimeExtractorPendingQueueCapacity -pipe_extractor_pending_queue_tablet_limit=64 - -# The buffer size used for reading file during file transfer. -pipe_connector_read_file_buffer_size=8388608 - -# The delay period (in milliseconds) between each retry when a connection failure occurs. -pipe_connector_retry_interval_ms=1000 - -# The size of the pending queue for the PipeConnector to store the events. -pipe_connector_pending_queue_size=1024 - -# The number of heartbeat loop cycles before collecting pipe meta once -pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100 - -# The initial delay before starting the PipeMetaSyncer service. -pipe_meta_syncer_initial_sync_delay_minutes=3 - -# The sync regular interval (in minutes) for the PipeMetaSyncer service. -pipe_meta_syncer_sync_interval_minutes=3 -``` - -# 备份场景 -IoTDB 的数据订阅工具可以处理 IoTDB 的自动实时备份需求。在经典的主从场景中,副本可以作为订阅的接收端而存在,可实时从发送端异步备份数据,不阻塞发送端的写入。在接收端和发送端均正常的情况下,就可进行数据的传输,保证接收端和发送端的数据最终一致。 - -## 备份示例 -使用 tsFile 的自动实时备份方式如下: -```bash -create pipe p1 - with extractor ( - 'extractor.realtime.mode'='file' - ) - with connector ( - 'connector'= 'iotdb-thrift-connector', - 'connector.ip' = '127.0.0.1', - 'connector.port' = '12345' - ) -``` -该语句将会启动向 127.0.0.1:12345 端口的全量同步,将以 tsfile 的方式备份,有着极高的备份传输性能。
