jt2594838 opened a new pull request, #11580:
URL: https://github.com/apache/iotdb/pull/11580
### Introduction
This PR introduces Local Split, an optimized procedure of syncing
**historical TsFiles** from one IoTDB instance to another with Pipe. Here is a
brief introduction to Local Split and comparisons with the old procedure
(called Basic).
1. The event generation is performed batched with a BatchedTsFileExtractor,
i.e., it generates PipeBatchTsFileInsertionEvents containing more than one
TsFile. The number of events is thus reduced, and with multiple files in the
same event, there are more tuning opportunities during the latter transfer. It
also reduces network traffic if each TsFile is small. The size of a batch is
controlled by two factors: the total size of files (in bytes) and the number of
files.
If the total size is so large that timeouts may be triggered, a feedback
mechanism collects the throughput during standard cases. It adjusts the total
file size according to the product of average throughput and timeout length
after a timeout. The recorded throughput history is purged statistically to
distinguish timeout triggered by GC stalls and large files.
2. TsFiles are transferred series-by-series instead of file-by-file. TsFiles
in a batch are merge-read, thanks to the sorted manner of timeseries. Chunks of
the same timeseries are collected and sent to the receiver, resulting in only
one file in the receiver for a batched event. This increases the locality of a
timeseries just like chunk merge in compaction, but it does not uncompress and
decode the chunk if unnecessary; thus, it is called Shallow Merge. Moreover, as
the number of files is reduced in the receiver, so are the higher-level
metadata and statistics, which may result in faster aggregation.
3. Chunks are further compressed. Although a chunk is compressed by default,
when using Shallow Merge mentioned above, chunks of the same timeseries are
sent in a batch, enabling opportunities for further compression. Chunks in a
batch are compressed by LZ4 by default to reduce network bandwidth consumption.
4. Local Split to avoid chunk-forwarding. In Basic, the receiver may not be
the actual data holder; it must forward data to the holders. In Local Split,
the sender queries the partition info from the receiver, performs chunk split
locally, and sends chunks directly to the data holders. This avoids the
possible coordinator and unnecessary traffic.
Considering the cross-cluster bandwidth could be sacred, the sender does not
send chunks to all replicas of the receiver. Instead, it only sends to one
replica and lets the replica forward to others. A throughput-based method is
applied to select the best relay among the replicas.
5. Grouped and parallel chunk sending. Notice that for non-align timeseries,
the order of chunks within a device can be arbitrary. This motivates parallel
processing when sending chunks of the same device. The chunks within a device
are grouped and sent to the receiver in parallel to maximize the resource
utility.
Chunks are grouped according to the similarity of timeseries, involving the
measurementId, data type, and samples from the data. Since groups are
compressed as aforesaid, putting similar timeseries together is helpful to get
a higher compression ratio.
### How to Use
To use the Local Split Feature in a Pipe, the user should add four configs
when creating a pipe like the following:
`create pipe a2b with extractor ('extractor.local-split.enable'='true') with
connector ('connector.local-split.enable'='true',
'connector.external.config-nodes'='nelbds-15:11710',
'connector'='iotdb-thrift-connector', 'connector.ip'='nelbds-16',
'connector.port'='7667') `
'extractor.local-split.enable' and 'connector.local-split.enable'='true'
must be set to true to enable the feature.
'connector.external.config-nodes' should be the config node's IP and port of
the receiver.
'connector'='iotdb-thrift-connector' is required, as only
'iotdb-thrift-connector' supports the feature currently.
As implied in the Loca Split, the sender must have direct access to all
nodes in the receiver. In other words, the sender should be on the firewall
white list.
Other configs are:
"extractor.split.max-concurrent-file", an integer specifying the maximum
number of files in an event.
"extractor.split.max-file-batch-size", an integer specifying the maximum
total size of files in an event, in bytes. The feedback mechanism may reduce
this after a timeout.
"connector.split.max-size", an integer specifying the maximum byte size to
transfer in a request.
"connector.split.max-concurrent-file", an integer specifying the maximum
number of files to be Shallow Merged, should be lesser than or equal to
"extractor.split.max-concurrent-file".
"connector.external.user-name", the user name to be used on the receiver
side.
"connector.external.password", the password to be used on the receiver side.
### Evaluation
To perform the evaluation, around 800GB of data is first written to a 1C1D
instance, then synchronized to a 1C3D instance through a Pipe. The number of
replicas is the x-axis.
Below is the total task completion time:

The results show a 50% reduction in completion time compared with basic.
This is the query latency of aggregating one single timeseries:

Thanks to Shallow Merge, the increased locality and reduced metadata speed
up such queries.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]