This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch rc/2.0.6
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 155789d75b211ce72120c72060b4dadba9363aa8
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 17:26:20 2026 +0800

    File sink
---
 .../dataregion/PipeDataRegionSinkConstructor.java  |   4 +
 .../iotdb/db/pipe/sink/protocol/ToFileSink.java    | 369 +++++++++++++++++++++
 .../agent/plugin/builtin/BuiltinPipePlugin.java    |   2 +
 3 files changed, 375 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
index 09773d0cad5..58d72295ef1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.db.pipe.sink.protocol.ToFileSink;
 import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
@@ -62,6 +63,8 @@ class PipeDataRegionSinkConstructor extends 
PipeSinkConstructor {
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName(), 
WriteBackSink::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.TO_FILE_CONNECTOR.getPipePluginName(), 
ToFileSink::new);
 
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), 
IoTDBDataRegionAsyncSink::new);
@@ -85,5 +88,6 @@ class PipeDataRegionSinkConstructor extends 
PipeSinkConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(),
         PipeConsensusAsyncSink::new);
+    pluginConstructors.put(BuiltinPipePlugin.TO_FILE_SINK.getPipePluginName(), 
ToFileSink::new);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/ToFileSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/ToFileSink.java
new file mode 100644
index 00000000000..f5b2d4111a4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/ToFileSink.java
@@ -0,0 +1,369 @@
+/*
+ * IoTDB Pipe框架自定义Sink插件
+ *
+ * 功能:将 Pipe 下发的数据以 TsFile 形式写入本地指定目录,按库名 / 导出日期创建子目录。
+ *       TabletInsertionEvent 直接按 库(db) + 导出日期 写成 tsfile(同一库下多 device 共用一个文件);
+ *       TsFileInsertionEvent 使用 PipeConnector 默认逻辑转为 TabletInsertionEvent 
后同样写入,
+ *       不再做 TsFile 复制或拆分,避免大文件重写带来的 GC 压力。
+ *
+ * 使用方法示例:
+ * CREATE PIPE pipe1
+ * WITH CONNECTOR (
+ *   'connector'='ToFileSink',
+ *   'sink.directory'='/path/to/save/tsfiles'
+ * );
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol;
+
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.annotation.TableModel;
+import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 将 Pipe 数据以 TsFile 形式写入本地目录的 Sink。
+ *
+ * <p>默认使用 {@link PipeTabletEventTsFileBatch} 批处理:仅支持 
PipeInsertNodeTabletInsertionEvent /
+ * PipeRawTabletInsertionEvent,按批 flush 到 
sink.directory,按库名/日期子目录布局。心跳事件用于触发落盘,避免最后几条留在内存。
+ */
+@TreeModel
+@TableModel
+public class ToFileSink implements PipeConnector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ToFileSink.class);
+
+  // 配置参数键
+  private static final String SINK_DIRECTORY_KEY = "sink.directory";
+  private static final String CONNECTOR_DIRECTORY_KEY = "connector.directory";
+  private static final String SINK_CREATE_SUBDIRS_KEY = "sink.create-subdirs";
+
+  /** 批处理:最大延迟(毫秒)触发 flush */
+  private static final String SINK_BATCH_DELAY_MS_KEY = "sink.batch-delay-ms";
+
+  private static final String CONNECTOR_BATCH_DELAY_MS_KEY = 
"connector.batch-delay-ms";
+
+  /** 批处理:最大缓冲字节数触发 flush */
+  private static final String SINK_BATCH_SIZE_BYTES_KEY = 
"sink.batch-size-bytes";
+
+  private static final String CONNECTOR_BATCH_SIZE_BYTES_KEY = 
"connector.batch-size-bytes";
+
+  /** 默认批延迟偏小,避免多 region 时 Pipe tablet 内存超限(flush 越早越早释放内存) */
+  private static final int DEFAULT_BATCH_DELAY_MS = 500;
+
+  private static final long DEFAULT_BATCH_SIZE_BYTES = 20L * 1024 * 1024;
+
+  // 配置参数
+  private String targetDirectory;
+  private boolean createSubdirs = false;
+  private String pipeName;
+
+  /** 实例唯一标识,用于 sink.parallel.tasks>1 时避免多实例写同一目录文件名冲突 */
+  private String instanceId;
+
+  /** TsFile 批处理器(默认唯一写路径) */
+  private PipeTabletEventTsFileBatch tsFileBatch;
+
+  private final Object batchFlushLock = new Object();
+
+  private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ofPattern("yyyyMMdd");
+
+  /** 文件名时间戳:精确到毫秒,避免同一秒内多次 flush 覆盖同一文件 */
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
+
+  /** 实例内 flush 序号,保证同一毫秒内多次 flush 也不重名 */
+  private final AtomicLong flushSequence = new AtomicLong(0);
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    final PipeParameters parameters = validator.getParameters();
+
+    // 验证目录参数
+    final String dir =
+        parameters.getStringOrDefault(
+            Arrays.asList(SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY), null);
+    validator.validate(
+        arg -> {
+          final String directory = (String) arg;
+          return directory != null && !directory.trim().isEmpty();
+        },
+        String.format(
+            "The parameter %s or %s must be specified and not empty",
+            SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY),
+        dir);
+
+    // 验证目录路径
+    if (dir != null) {
+      final Path path = Paths.get(dir);
+      if (path.toFile().exists() && !path.toFile().isDirectory()) {
+        throw new PipeException(String.format("The path %s exists but is not a 
directory", dir));
+      }
+    }
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    // 获取目标目录
+    targetDirectory =
+        parameters.getStringOrDefault(
+            Arrays.asList(SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY), null);
+
+    if (targetDirectory == null || targetDirectory.trim().isEmpty()) {
+      throw new PipeException(
+          String.format(
+              "The parameter %s or %s must be specified",
+              SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY));
+    }
+
+    // 规范化路径
+    targetDirectory = 
Paths.get(targetDirectory).toAbsolutePath().normalize().toString();
+
+    createSubdirs =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(SINK_CREATE_SUBDIRS_KEY, 
"connector.create-subdirs"), false);
+
+    // 获取Pipe信息
+    pipeName = configuration.getRuntimeEnvironment().getPipeName();
+    // 实例唯一标识,sink.parallel.tasks>1 时避免多实例写同一目录文件名冲突
+    instanceId =
+        Integer.toHexString(System.identityHashCode(this))
+            + "_"
+            + Long.toHexString(System.nanoTime() % 0xFFFF);
+
+    int batchDelayMs =
+        parameters.getIntOrDefault(
+            Arrays.asList(SINK_BATCH_DELAY_MS_KEY, 
CONNECTOR_BATCH_DELAY_MS_KEY),
+            DEFAULT_BATCH_DELAY_MS);
+    long batchSizeBytes =
+        parameters.getLongOrDefault(
+            Arrays.asList(SINK_BATCH_SIZE_BYTES_KEY, 
CONNECTOR_BATCH_SIZE_BYTES_KEY),
+            DEFAULT_BATCH_SIZE_BYTES);
+    tsFileBatch = new PipeTabletEventTsFileBatch(batchDelayMs, batchSizeBytes);
+
+    LOGGER.info(
+        "ToFileSink customized: targetDirectory={}, createSubdirs={}, 
pipeName={}, instanceId={}, batchDelayMs={}, batchSizeBytes={}",
+        targetDirectory,
+        createSubdirs,
+        pipeName,
+        instanceId,
+        batchDelayMs,
+        batchSizeBytes);
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    // 创建目标目录
+    final Path targetPath = Paths.get(targetDirectory);
+    if (!Files.exists(targetPath)) {
+      Files.createDirectories(targetPath);
+      LOGGER.info("Created target directory: {}", targetDirectory);
+    }
+
+    // 验证目录可写
+    if (!Files.isWritable(targetPath)) {
+      throw new PipeException(
+          String.format("Target directory is not writable: %s", 
targetDirectory));
+    }
+
+    LOGGER.info("ToFileSink handshake completed: {}", targetDirectory);
+  }
+
+  @Override
+  public void heartbeat() throws Exception {
+    // 检查目标目录是否仍然存在且可写
+    final Path targetPath = Paths.get(targetDirectory);
+    if (!Files.exists(targetPath)) {
+      throw new PipeConnectionException(
+          String.format("Target directory does not exist: %s", 
targetDirectory));
+    }
+    if (!Files.isWritable(targetPath)) {
+      throw new PipeConnectionException(
+          String.format("Target directory is not writable: %s", 
targetDirectory));
+    }
+  }
+
+  /**
+   * 正确流程(按开发者约定): 1. constructBatch:将事件放入批内存(此处通过 onEvent 实现); 2. 
shouldEmit:判断内存大小或批时间是否超过预期,超过则返回
+   * true; 3. 若 shouldEmit 为 true,则手动调用 sealTsFiles 生成文件; 4. 若生成/写入文件成功,则调用 
onSuccess。
+   */
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      LOGGER.warn(
+          "ToFileSink only supports PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent, ignore {}",
+          tabletInsertionEvent.getClass().getSimpleName());
+      return;
+    }
+    try {
+      // 1. constructBatch:将事件放入内存(PipeTabletEventTsFileBatch 通过 onEvent 实现)
+      tsFileBatch.onEvent(tabletInsertionEvent);
+      // 2. shouldEmit:判断内存大小或批时间是否超过预期
+      if (tsFileBatch.shouldEmit()) {
+        // 3. 手动调用 sealTsFiles 生成文件,成功则在 flush 内调用 onSuccess
+        flushBatchToTargetDirectory();
+      }
+    } catch (WALPipeException | IOException e) {
+      throw new PipeException("ToFileSink: TsFile batch onEvent failed", e);
+    }
+  }
+
+  /**
+   * 收到 TsFile 事件时先 flush 当前批再解析 TsFile,避免批内占用的 tablet 内存不释放、 导致 
toTabletInsertionEvents() 在
+   * waitForResourceEnough4Parsing 里等不到内存而超时。
+   */
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    if (tsFileBatch != null && !tsFileBatch.isEmpty()) {
+      try {
+        flushBatchToTargetDirectory();
+      } catch (IOException | WriteProcessException e) {
+        throw new PipeException("ToFileSink: flush before TsFile parse 
failed", e);
+      }
+    }
+    try {
+      for (final TabletInsertionEvent tabletInsertionEvent :
+          tsFileInsertionEvent.toTabletInsertionEvents()) {
+        transfer(tabletInsertionEvent);
+      }
+    } finally {
+      tsFileInsertionEvent.close();
+    }
+  }
+
+  /** 心跳等事件:若批内已有数据且 shouldEmit 为 true,则 sealTsFiles 并复制,成功则 onSuccess。 */
+  @Override
+  public void transfer(Event event) throws Exception {
+    if (tsFileBatch != null && !tsFileBatch.isEmpty() && 
tsFileBatch.shouldEmit()) {
+      try {
+        flushBatchToTargetDirectory();
+      } catch (IOException | WriteProcessException e) {
+        throw new PipeException("ToFileSink: heartbeat-triggered batch flush 
failed", e);
+      }
+    }
+    LOGGER.debug("ToFileSink received event: {}", 
event.getClass().getSimpleName());
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (tsFileBatch != null) {
+      synchronized (batchFlushLock) {
+        try {
+          if (!tsFileBatch.isEmpty()) {
+            flushBatchToTargetDirectory();
+          }
+        } catch (Exception e) {
+          LOGGER.warn(
+              "ToFileSink: flush remaining TsFile batch on close failed: {}", 
e.getMessage());
+        }
+        try {
+          tsFileBatch.close();
+        } catch (Exception e) {
+          LOGGER.warn("ToFileSink: close TsFile batch failed: {}", 
e.getMessage());
+        }
+        tsFileBatch = null;
+      }
+    }
+
+    LOGGER.info("ToFileSink closed: {}", targetDirectory);
+  }
+
+  /**
+   * 按约定:shouldEmit 为 true 时调用本方法。 内部执行 sealTsFiles 生成文件,将 TsFile 复制到目标目录(按 
db/日期 布局);
+   * 仅当生成/写入文件成功后调用 onSuccess。
+   */
+  private void flushBatchToTargetDirectory() throws IOException, 
WriteProcessException {
+    if (tsFileBatch == null || tsFileBatch.isEmpty()) {
+      return;
+    }
+    synchronized (batchFlushLock) {
+      // 3. 手动调用 sealTsFiles 生成文件
+      List<Pair<String, File>> dbTsFilePairs = tsFileBatch.sealTsFiles();
+      if (dbTsFilePairs.isEmpty()) {
+        tsFileBatch.decreaseEventsReferenceCount(
+            PipeTransferBatchReqBuilder.class.getName(), false);
+        // 无文件生成也视为本批处理完成,需 onSuccess 清空批状态
+        tsFileBatch.onSuccess();
+        return;
+      }
+      final String date = 
LocalDate.now(ZoneId.systemDefault()).format(DATE_FORMAT);
+      final String createTime =
+          Instant.now().atZone(ZoneId.systemDefault()).format(DATETIME_FORMAT);
+      final long seq = flushSequence.getAndIncrement();
+      int index = 0;
+      for (Pair<String, File> pair : dbTsFilePairs) {
+        String dbName = pair.left;
+        File srcFile = pair.right;
+        if (dbName == null) {
+          dbName = "";
+        }
+        Path basePath = Paths.get(targetDirectory);
+        if (createSubdirs && !dbName.isEmpty()) {
+          basePath = basePath.resolve(dbName.replace('.', '_').replace(' ', 
'_'));
+        }
+        Path dateDir = basePath.resolve(date);
+        Files.createDirectories(dateDir);
+        String safeDbName =
+            dbName.replace('.', '_').replace(' ', '_').replace('/', 
'_').replace(':', '_');
+        // 使用 毫秒时间戳 + 序号,避免同一秒/同一毫秒内多次 flush 覆盖
+        String fileName =
+            instanceId == null
+                ? String.format("tablet-%s-%s-%d-%d.tsfile", safeDbName, 
createTime, seq, index)
+                : String.format(
+                    "tablet-%s-%s-%d-%d-%s.tsfile", safeDbName, createTime, 
seq, index, instanceId);
+        Path destPath = dateDir.resolve(fileName);
+        Files.copy(srcFile.toPath(), destPath, 
StandardCopyOption.REPLACE_EXISTING);
+        try {
+          if (!srcFile.delete()) {
+            LOGGER.warn("ToFileSink: could not delete batch temp file {}", 
srcFile);
+          }
+        } catch (Exception e) {
+          LOGGER.warn(
+              "ToFileSink: failed to delete batch temp file {}: {}", srcFile, 
e.getMessage());
+        }
+        LOGGER.debug("ToFileSink: batch TsFile copied to {}", destPath);
+        index++;
+      }
+      // 释放本批事件的引用,使 tablet 内存立即归还 PipeMemoryManager
+      
tsFileBatch.decreaseEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName(),
 false);
+      // 4. 生成文件并复制成功后调用 onSuccess
+      tsFileBatch.onSuccess();
+      LOGGER.info(
+          "ToFileSink: flushed TsFile batch, {} file(s) copied to 
targetDirectory",
+          dbTsFilePairs.size());
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index daab48bbe23..d0087160e3e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -69,6 +69,7 @@ public enum BuiltinPipePlugin {
 
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
   WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class),
+  TO_FILE_CONNECTOR("to-file-connector", WriteBackSink.class),
 
   DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class),
   IOTDB_THRIFT_SINK("iotdb-thrift-sink", IoTDBThriftSink.class),
@@ -80,6 +81,7 @@ public enum BuiltinPipePlugin {
   WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
   SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
   PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", 
PipeConsensusAsyncSink.class),
+  TO_FILE_SINK("to-file-sink", WriteBackSink.class),
   ;
 
   private final String pipePluginName;

Reply via email to