This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 94500581f6 [IOTDB-3224][IOTDB-3949] Sync pipe execution and data 
collection process in standalone version (#7154)
94500581f6 is described below

commit 94500581f6ceb8939287c184fb992d39aa4fd55d
Author: Chen YZ <[email protected]>
AuthorDate: Mon Sep 5 15:35:56 2022 +0800

    [IOTDB-3224][IOTDB-3949] Sync pipe execution and data collection process in 
standalone version (#7154)
---
 .../{IoTDBPipeSinkIT.java => IoTDBPipeIT.java}     |  82 ++++++++----
 .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java   |   4 +-
 .../db/integration/sync/IoTDBSyncSenderIT.java     |   1 +
 .../apache/iotdb/commons/sync/SyncConstant.java    |   4 +-
 .../apache/iotdb/db/engine/StorageEngineV2.java    |   6 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  21 +--
 .../db/engine/storagegroup/TsFileManager.java      |  14 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  24 ++--
 .../dataregion/StorageGroupManager.java            |  10 --
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  65 ++++++---
 .../schemaregion/SchemaRegionMemoryImpl.java       |  11 --
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |  11 --
 .../db/mpp/common/header/ColumnHeaderConstant.java |  17 +++
 .../db/mpp/common/header/DatasetHeaderFactory.java |   4 +
 .../config/executor/ClusterConfigTaskExecutor.java |  15 ++-
 .../config/executor/IConfigTaskExecutor.java       |  15 ++-
 .../executor/StandaloneConfigTaskExecutor.java     |  74 +++++-----
 .../execution/config/sys/sync/CreatePipeTask.java  |   4 +-
 .../execution/config/sys/sync/DropPipeTask.java    |   4 +-
 .../execution/config/sys/sync/ShowPipeTask.java    |  41 +++++-
 .../execution/config/sys/sync/StartPipeTask.java   |   4 +-
 .../execution/config/sys/sync/StopPipeTask.java    |   4 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |   2 +
 .../statement/sys/sync/CreatePipeStatement.java    |  38 ++++++
 .../java/org/apache/iotdb/db/sync/SyncService.java |  95 +++++++++++++
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   5 +-
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  19 ++-
 .../org/apache/iotdb/db/sync/common/SyncInfo.java  |  35 ++++-
 .../db/sync/common/persistence/SyncLogReader.java  |   8 +-
 .../db/sync/common/persistence/SyncLogWriter.java  |  17 ++-
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |  54 ++++----
 .../sender/manager/ISyncManager.java}              |  34 ++---
 .../db/sync/sender/manager/LocalSyncManager.java   |  90 +++++++++++++
 .../db/sync/sender/manager/SchemaSyncManager.java  | 149 ---------------------
 .../db/sync/sender/manager/TsFileSyncManager.java  | 106 ---------------
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |  12 ++
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      | 144 ++++++++------------
 .../db/sync/transport/client/SenderManager.java    |   1 -
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |  47 +++++++
 .../engine/storagegroup/TsFileProcessorTest.java   |   6 +-
 .../db/sync/receiver/manager/SyncInfoTest.java     |   8 +-
 .../db/sync/receiver/recovery/SyncLogTest.java     |   8 +-
 .../src/main/thrift/confignode.thrift              |  15 +++
 43 files changed, 757 insertions(+), 571 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
 b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
similarity index 56%
copy from 
integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
copy to 
integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
index 51dbcf99de..96907efc55 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -36,9 +37,11 @@ import java.sql.Statement;
 
 import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual;
 
+// TODO: this test only support for new standalone now
+@Ignore
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class})
-public class IoTDBPipeSinkIT {
+public class IoTDBPipeIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -51,44 +54,54 @@ public class IoTDBPipeSinkIT {
   }
 
   @Test
-  public void testShowPipeSinkType() {
+  public void testOperatePipe() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
-      String expectedHeader = ColumnHeaderConstant.COLUMN_PIPESINK_TYPE + ",";
-      String[] expectedRetSet = new String[] {"IoTDB,", "ExternalPipe,"};
-      try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINKTYPE")) {
-        assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
-  }
-
-  @Test
-  public void testOperatePipeSink() {
-    try (Connection connection = EnvFactory.getEnv().getConnection();
-        Statement statement = connection.createStatement()) {
-      statement.execute("CREATE PIPESINK demo1 AS IoTDB 
(ip='192.168.0.1',port='6677');");
-      statement.execute("CREATE PIPESINK demo2 AS IoTDB 
(ip='192.168.0.2',port='6678');");
-      statement.execute("CREATE PIPESINK demo3 AS IoTDB;");
-      statement.execute("DROP PIPESINK demo2;");
+      String ip = EnvFactory.getEnv().getDataNodeWrapperList().get(0).getIp();
+      int port = EnvFactory.getEnv().getDataNodeWrapperList().get(0).getPort();
+      statement.execute(
+          String.format("CREATE PIPESINK demo AS IoTDB (ip='%s',port='%d');", 
ip, port));
+      statement.execute("CREATE PIPE p to demo;");
       String expectedHeader =
-          ColumnHeaderConstant.COLUMN_PIPESINK_NAME
+          ColumnHeaderConstant.COLUMN_PIPE_CREATE_TIME
+              + ","
+              + ColumnHeaderConstant.COLUMN_PIPE_NAME
+              + ","
+              + ColumnHeaderConstant.COLUMN_PIPE_ROLE
+              + ","
+              + ColumnHeaderConstant.COLUMN_PIPE_REMOTE
               + ","
-              + ColumnHeaderConstant.COLUMN_PIPESINK_TYPE
+              + ColumnHeaderConstant.COLUMN_PIPE_STATUS
               + ","
-              + ColumnHeaderConstant.COLUMN_PIPESINK_ATTRIBUTES
+              + ColumnHeaderConstant.COLUMN_PIPE_MESSAGE
               + ",";
-      try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK")) {
+
+      String createTime = getCreateTime("p");
+      try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
+        String[] expectedRetSet =
+            new String[] {String.format("%s,p,sender,demo,STOP,,", 
createTime)};
+        assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
+      }
+      statement.execute("START PIPE p;");
+      Thread.sleep(1000); // wait 1000 ms to start thread
+      try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
         String[] expectedRetSet =
             new String[] {
-              "demo3,IoTDB,ip='127.0.0.1',port=6670,", 
"demo1,IoTDB,ip='192.168.0.1',port=6677,"
+              String.format("%s,p,sender,demo,RUNNING,,", createTime),
+              String.format("%s,p,receiver,0.0.0.0,RUNNING,,", createTime),
             };
         assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
       }
-      try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK 
demo3")) {
-        String[] expectedRetSet = new String[] 
{"demo3,IoTDB,ip='127.0.0.1',port=6670,"};
+      statement.execute("STOP PIPE p;");
+      try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
+        String[] expectedRetSet =
+            new String[] {String.format("%s,p,sender,demo,STOP,,", 
createTime)};
+        assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
+      }
+      statement.execute("DROP PIPE p;");
+      try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
+        String[] expectedRetSet =
+            new String[] {String.format("%s,p,sender,demo,DROP,,", 
createTime)};
         assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
       }
     } catch (Exception e) {
@@ -96,4 +109,17 @@ public class IoTDBPipeSinkIT {
       Assert.fail();
     }
   }
+
+  private String getCreateTime(String pipeName) throws Exception {
+    String createTime = "";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery("SHOW PIPE " + 
pipeName)) {
+        Assert.assertTrue(resultSet.next());
+        createTime = resultSet.getString(1);
+      }
+    }
+    Assert.assertNotEquals("", createTime);
+    return createTime;
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
index 51dbcf99de..573c2dadf4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
@@ -83,12 +83,12 @@ public class IoTDBPipeSinkIT {
       try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK")) {
         String[] expectedRetSet =
             new String[] {
-              "demo3,IoTDB,ip='127.0.0.1',port=6670,", 
"demo1,IoTDB,ip='192.168.0.1',port=6677,"
+              "demo3,IoTDB,ip='127.0.0.1',port=6667,", 
"demo1,IoTDB,ip='192.168.0.1',port=6677,"
             };
         assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
       }
       try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK 
demo3")) {
-        String[] expectedRetSet = new String[] 
{"demo3,IoTDB,ip='127.0.0.1',port=6670,"};
+        String[] expectedRetSet = new String[] 
{"demo3,IoTDB,ip='127.0.0.1',port=6667,"};
         assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
       }
     } catch (Exception e) {
diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index a5be8af83b..588db66902 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -51,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@Ignore
 @Category({LocalStandaloneTest.class})
 public class IoTDBSyncSenderIT {
   private boolean enableSeqSpaceCompaction;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
index 81f54e7026..8e17263cfb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
@@ -26,6 +26,8 @@ public class SyncConstant {
 
   public static final String SYNC_SYS_DIR = "sys";
   public static final String FILE_DATA_DIR_NAME = "file-data";
+  public static final String ROLE_SENDER = "sender";
+  public static final String ROLE_RECEIVER = "receiver";
 
   // pipe log: serialNumber + SEPARATOR + SUFFIX
   public static final String PIPE_LOG_DIR_NAME = "pipe-log";
@@ -54,7 +56,7 @@ public class SyncConstant {
 
   // data config
   public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
-  public static final int DEFAULT_PIPE_SINK_PORT = 6670;
+  public static final int DEFAULT_PIPE_SINK_PORT = 6667;
 
   public static final Long HEARTBEAT_DELAY_SECONDS = 30L;
   public static final int CONNECT_TIMEOUT_MILLISECONDS = 1_000;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index c14ebd6a93..f50abbf5a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -50,6 +50,7 @@ import 
org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.db.wal.WALManager;
@@ -665,6 +666,7 @@ public class StorageEngineV2 implements IService {
               .deleteWALNode(
                   region.getStorageGroupName() + FILE_NAME_SEPARATOR + 
region.getDataRegionId());
         }
+        SyncService.getInstance().deleteSyncManager(region.getDataRegionId());
       } catch (Exception e) {
         logger.error(
             "Error occurs when deleting data region {}-{}",
@@ -682,6 +684,10 @@ public class StorageEngineV2 implements IService {
     return dataRegionMap.get(regionId);
   }
 
+  public List<DataRegion> getAllDataRegions() {
+    return new ArrayList<>(dataRegionMap.values());
+  }
+
   public List<DataRegionId> getAllDataRegionIds() {
     return new ArrayList<>(dataRegionMap.keySet());
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e58f55202e..7f959fb3fa 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -88,7 +88,8 @@ import org.apache.iotdb.db.service.SettleService;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.utils.UpgradeUtils;
@@ -275,9 +276,6 @@ public class DataRegion {
 
   private IDTable idTable;
 
-  /** used to collect TsFiles in this virtual storage group */
-  private TsFileSyncManager tsFileSyncManager = 
TsFileSyncManager.getInstance();
-
   /**
    * constrcut a storage group processor
    *
@@ -751,8 +749,9 @@ public class DataRegion {
     TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
     if (!recoverPerformer.canWrite()) {
       // cannot write, just close it
-      if (tsFileSyncManager.isEnableSync()) {
-        tsFileSyncManager.collectRealTimeTsFile(tsFileResource.getTsFile());
+      for (ISyncManager syncManager :
+          SyncService.getInstance().getOrCreateSyncManager(dataRegionId)) {
+        syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
       }
       try {
         tsFileResource.close();
@@ -2414,8 +2413,9 @@ public class DataRegion {
         tsFileResource.getProcessor().deleteDataInMemory(deletion, 
devicePaths);
       }
 
-      if (tsFileSyncManager.isEnableSync()) {
-        tsFileSyncManager.collectRealTimeDeletion(deletion, storageGroupName);
+      for (ISyncManager syncManager :
+          SyncService.getInstance().getOrCreateSyncManager(dataRegionId)) {
+        syncManager.syncRealTimeDeletion(deletion);
       }
 
       // add a record in case of rollback
@@ -3677,14 +3677,15 @@ public class DataRegion {
   /**
    * Used to collect history TsFiles(i.e. the tsfile whose memtable == null).
    *
+   * @param syncManager ISyncManager which invokes to collect history TsFile
    * @param dataStartTime only collect history TsFiles which contains the data 
after the
    *     dataStartTime
    * @return A list, which contains TsFile path
    */
-  public List<File> collectHistoryTsFileForSync(long dataStartTime) {
+  public List<File> collectHistoryTsFileForSync(ISyncManager syncManager, long 
dataStartTime) {
     writeLock("Collect data for sync");
     try {
-      return tsFileManager.collectHistoryTsFileForSync(dataStartTime);
+      return tsFileManager.collectHistoryTsFileForSync(syncManager, 
dataStartTime);
     } finally {
       writeUnlock();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index f09347c8d1..32c70b13da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.db.exception.WriteLockFailedException;
 import org.apache.iotdb.db.rescon.TsFileResourceManager;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -373,12 +373,12 @@ public class TsFileManager {
     return unsequenceRecoverTsFileResources;
   }
 
-  public List<File> collectHistoryTsFileForSync(long dataStartTime) {
+  public List<File> collectHistoryTsFileForSync(ISyncManager syncManager, long 
dataStartTime) {
     readLock();
     try {
       List<File> historyTsFiles = new ArrayList<>();
-      collectTsFile(historyTsFiles, getTsFileList(true), dataStartTime);
-      collectTsFile(historyTsFiles, getTsFileList(false), dataStartTime);
+      collectTsFile(historyTsFiles, getTsFileList(true), syncManager, 
dataStartTime);
+      collectTsFile(historyTsFiles, getTsFileList(false), syncManager, 
dataStartTime);
       return historyTsFiles;
     } finally {
       readUnlock();
@@ -386,8 +386,10 @@ public class TsFileManager {
   }
 
   private void collectTsFile(
-      List<File> historyTsFiles, List<TsFileResource> tsFileResources, long 
dataStartTime) {
-    TsFileSyncManager syncManager = TsFileSyncManager.getInstance();
+      List<File> historyTsFiles,
+      List<TsFileResource> tsFileResources,
+      ISyncManager syncManager,
+      long dataStartTime) {
 
     for (TsFileResource tsFileResource : tsFileResources) {
       if (tsFileResource.getFileEndTime() < dataStartTime) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c1f7e98f5f..605cc979db 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -59,7 +59,8 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -166,9 +167,6 @@ public class TsFileProcessor {
   /** flush file listener */
   private List<FlushListener> flushListeners = new ArrayList<>();
 
-  /** used to collct this TsFile for sync */
-  private TsFileSyncManager tsFileSyncManager = 
TsFileSyncManager.getInstance();
-
   @SuppressWarnings("squid:S107")
   TsFileProcessor(
       String storageGroupName,
@@ -839,8 +837,10 @@ public class TsFileProcessor {
       if (!flushingMemTables.isEmpty()) {
         modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
       }
-      if (tsFileSyncManager.isEnableSync()) {
-        tsFileSyncManager.collectRealTimeDeletion(deletion, storageGroupName);
+      for (ISyncManager syncManager :
+          SyncService.getInstance()
+              
.getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+        syncManager.syncRealTimeDeletion(deletion);
       }
     } finally {
       flushQueryLock.writeLock().unlock();
@@ -996,8 +996,10 @@ public class TsFileProcessor {
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
         addAMemtableIntoFlushingList(tmpMemTable);
-        if (tsFileSyncManager.isEnableSync()) {
-          tsFileSyncManager.collectRealTimeTsFile(tsFileResource.getTsFile());
+        for (ISyncManager syncManager :
+            SyncService.getInstance()
+                
.getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+          syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
         }
         logger.info("Memtable {} has been added to flushing list", 
tmpMemTable);
         shouldClose = true;
@@ -1426,8 +1428,10 @@ public class TsFileProcessor {
     long closeStartTime = System.currentTimeMillis();
     writer.endFile();
     tsFileResource.serialize();
-    if (tsFileSyncManager.isEnableSync()) {
-      tsFileSyncManager.collectRealTimeResource(tsFileResource.getTsFile());
+    for (ISyncManager syncManager :
+        SyncService.getInstance()
+            
.getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+      syncManager.syncRealTimeResource(tsFileResource.getTsFile());
     }
     logger.info("Ended file {}", tsFileResource);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
index e079e77289..35e55dea8a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -466,15 +465,6 @@ public class StorageGroupManager {
     }
   }
 
-  /** collect all tsfiles whose memtable == null for sync */
-  public List<File> collectHistoryTsFileForSync(long dataStartTime) {
-    List<File> historyTsFiles = new ArrayList<>();
-    for (DataRegion processor : this.dataRegion) {
-      
historyTsFiles.addAll(processor.collectHistoryTsFileForSync(dataStartTime));
-    }
-    return historyTsFiles;
-  }
-
   /** only for test */
   public void reset() {
     Arrays.fill(dataRegion, null);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index c6be69d323..25c1c9332d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -50,6 +50,9 @@ import 
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -65,6 +68,7 @@ import 
org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import 
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -79,19 +83,17 @@ import 
org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
 import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.sync.SyncService;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -116,8 +118,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-
 /**
  * This class simulates the behaviour of configNode to manage the configs 
locally. The schema
  * configs include storage group, schema region and template. The data config 
is dataRegion.
@@ -274,10 +274,6 @@ public class LocalConfigNode {
       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
     }
 
-    if (SchemaSyncManager.getInstance().isEnableSync()) {
-      SchemaSyncManager.getInstance().syncMetadataPlan(new 
SetStorageGroupPlan(storageGroup));
-    }
-
     if (!config.isEnableMemControl()) {
       MemTableManager.getInstance().addOrDeleteStorageGroup(1);
     }
@@ -291,22 +287,12 @@ public class LocalConfigNode {
       dataPartitionTable.deleteStorageGroup(storageGroup);
     }
 
-    DeleteTimeSeriesPlan deleteTimeSeriesPlan =
-        SchemaSyncManager.getInstance().isEnableSync()
-            ? SchemaSyncManager.getInstance()
-                .splitDeleteTimeseriesPlanByDevice(
-                    storageGroup.concatNode(MULTI_LEVEL_PATH_WILDCARD))
-            : null;
-
     deleteSchemaRegionsInStorageGroup(
         storageGroup, 
schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
 
     for (Template template : templateManager.getTemplateMap().values()) {
       templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
     }
-    if (SchemaSyncManager.getInstance().isEnableSync()) {
-      SchemaSyncManager.getInstance().syncMetadataPlan(deleteTimeSeriesPlan);
-    }
 
     if (!config.isEnableMemControl()) {
       MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
@@ -1370,4 +1356,45 @@ public class LocalConfigNode {
       return Collections.singletonList(syncService.getPipeSink(pipeSinkName));
     }
   }
+
+  public TSStatus createPipe(CreatePipeStatement createPipeStatement) {
+    try {
+      syncService.addPipe(createPipeStatement);
+    } catch (PipeException e) {
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public TSStatus startPipe(String pipeName) {
+    try {
+      syncService.startPipe(pipeName);
+    } catch (PipeException e) {
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public TSStatus stopPipe(String pipeName) {
+    try {
+      syncService.stopPipe(pipeName);
+    } catch (PipeException e) {
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public TSStatus dropPipe(String pipeName) {
+    try {
+      syncService.dropPipe(pipeName);
+    } catch (PipeException e) {
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public TShowPipeResp showPipe(String pipeName) {
+    List<TPipeInfo> pipeInfos = SyncService.getInstance().showPipe(pipeName);
+    return new 
TShowPipeResp().setPipeInfoList(pipeInfos).setStatus(StatusUtils.OK);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 6c3e308799..90394b6041 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -79,7 +79,6 @@ import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.external.api.ISeriesNumerLimiter;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -178,7 +177,6 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
   // device -> DeviceMNode
   private LoadingCache<PartialPath, IMNode> mNodeCache;
   private TagManager tagManager;
-  private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
 
   private final ISeriesNumerLimiter seriesNumerLimiter;
 
@@ -646,9 +644,6 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
         }
         plan.setTagOffset(offset);
         writeToMLog(plan);
-        if (syncManager.isEnableSync()) {
-          syncManager.syncMetadataPlan(plan);
-        }
       }
       if (offset != -1) {
         leafMNode.setOffset(offset);
@@ -792,9 +787,6 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
         }
         plan.setTagOffsets(tagOffsets);
         writeToMLog(plan);
-        if (syncManager.isEnableSync()) {
-          syncManager.syncMetadataPlan(plan);
-        }
       }
       tagOffsets = plan.getTagOffsets();
       for (int i = 0; i < measurements.size(); i++) {
@@ -861,9 +853,6 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
         }
         deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
         writeToMLog(deleteTimeSeriesPlan);
-        if (syncManager.isEnableSync()) {
-          syncManager.syncMetadataPlan(deleteTimeSeriesPlan);
-        }
       }
     } catch (DeleteFailedException e) {
       failedNames.add(e.getName());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 2194b4126a..051f78898f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -76,7 +76,6 @@ import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.external.api.ISeriesNumerLimiter;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -172,7 +171,6 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
   // device -> DeviceMNode
   private LoadingCache<PartialPath, IMNode> mNodeCache;
   private TagManager tagManager;
-  private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
 
   private final ISeriesNumerLimiter seriesNumerLimiter;
 
@@ -538,9 +536,6 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
           }
           plan.setTagOffset(offset);
           logWriter.createTimeseries(plan);
-          if (syncManager.isEnableSync()) {
-            syncManager.syncMetadataPlan(plan);
-          }
         }
         if (offset != -1) {
           leafMNode.setOffset(offset);
@@ -708,9 +703,6 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
           }
           plan.setTagOffsets(tagOffsets);
           logWriter.createAlignedTimeseries(plan);
-          if (syncManager.isEnableSync()) {
-            syncManager.syncMetadataPlan(plan);
-          }
         }
         tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -783,9 +775,6 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
         }
         deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
         logWriter.deleteTimeseries(deleteTimeSeriesPlan);
-        if (syncManager.isEnableSync()) {
-          syncManager.syncMetadataPlan(deleteTimeSeriesPlan);
-        }
       }
     } catch (DeleteFailedException e) {
       failedNames.add(e.getName());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 55faa87dab..e642a4504f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -96,6 +96,14 @@ public class ColumnHeaderConstant {
   public static final String COLUMN_PIPESINK_NAME = "name";
   public static final String COLUMN_PIPESINK_ATTRIBUTES = "attributes";
 
+  // column names for show pipe
+  public static final String COLUMN_PIPE_CREATE_TIME = "create time";
+  public static final String COLUMN_PIPE_NAME = "name";
+  public static final String COLUMN_PIPE_ROLE = "role";
+  public static final String COLUMN_PIPE_REMOTE = "remote";
+  public static final String COLUMN_PIPE_STATUS = "status";
+  public static final String COLUMN_PIPE_MESSAGE = "message";
+
   public static final List<ColumnHeader> lastQueryColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
@@ -235,4 +243,13 @@ public class ColumnHeaderConstant {
           new ColumnHeader(COLUMN_PIPESINK_NAME, TSDataType.TEXT),
           new ColumnHeader(COLUMN_PIPESINK_TYPE, TSDataType.TEXT),
           new ColumnHeader(COLUMN_PIPESINK_ATTRIBUTES, TSDataType.TEXT));
+
+  public static final List<ColumnHeader> showPipeColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(COLUMN_PIPE_CREATE_TIME, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_PIPE_NAME, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_PIPE_ROLE, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_PIPE_REMOTE, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_PIPE_STATUS, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_PIPE_MESSAGE, TSDataType.TEXT));
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index b079f3ad55..b6f9f5a6c3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -120,4 +120,8 @@ public class DatasetHeaderFactory {
   public static DatasetHeader getShowPipeSinkHeader() {
     return new DatasetHeader(ColumnHeaderConstant.showPipeSinkColumnHeaders, 
true);
   }
+
+  public static DatasetHeader getShowPipeHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f8853dd43e..0f770e6509 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -77,8 +77,13 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -659,7 +664,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> createPipe() {
+  public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement 
createPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     future.setException(
         new IoTDBException(
@@ -680,7 +685,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> dropPipe() {
+  public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement 
dropPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     future.setException(
         new IoTDBException(
@@ -701,7 +706,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> showPipe() {
+  public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement 
showPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     future.setException(
         new IoTDBException(
@@ -722,7 +727,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> startPipe() {
+  public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement 
startPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     future.setException(
         new IoTDBException(
@@ -732,7 +737,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> stopPipe() {
+  public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement 
stopPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     future.setException(
         new IoTDBException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index e4f03578b2..861f42277c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -39,8 +39,13 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
 
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -119,13 +124,13 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> showPipeSink(ShowPipeSinkStatement 
showPipeSinkStatement);
 
-  SettableFuture<ConfigTaskResult> dropPipe();
+  SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement 
dropPipeStatement);
 
-  SettableFuture<ConfigTaskResult> createPipe();
+  SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement 
createPipeStatement);
 
-  SettableFuture<ConfigTaskResult> showPipe();
+  SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement 
startPipeStatement);
 
-  SettableFuture<ConfigTaskResult> startPipe();
+  SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement 
stopPipeStatement);
 
-  SettableFuture<ConfigTaskResult> stopPipe();
+  SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement 
showPipeStatement);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 3d48e6774b..27aa838107 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.trigger.enums.TriggerEvent;
 import org.apache.iotdb.commons.trigger.enums.TriggerType;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
@@ -38,6 +39,7 @@ import 
org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupT
 import 
org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
@@ -52,8 +54,13 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -466,16 +473,6 @@ public class StandaloneConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
-  @Override
-  public SettableFuture<ConfigTaskResult> createPipe() {
-    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing create pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
-    return future;
-  }
-
   @Override
   public SettableFuture<ConfigTaskResult> createPipeSink(
       CreatePipeSinkStatement createPipeSinkStatement) {
@@ -514,42 +511,59 @@ public class StandaloneConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> dropPipe() {
+  public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement 
createPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing drop pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    TSStatus tsStatus = 
LocalConfigNode.getInstance().createPipe(createPipeStatement);
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> showPipe() {
+  public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement 
startPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing show pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    TSStatus tsStatus = 
LocalConfigNode.getInstance().startPipe(startPipeStatement.getPipeName());
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> startPipe() {
+  public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement 
stopPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing Start pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    TSStatus tsStatus = 
LocalConfigNode.getInstance().stopPipe(stopPipeStatement.getPipeName());
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> stopPipe() {
+  public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement 
dropPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing stop pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    TSStatus tsStatus = 
LocalConfigNode.getInstance().dropPipe(dropPipeStatement.getPipeName());
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement 
showPipeStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    TShowPipeResp showPipeResp =
+        
LocalConfigNode.getInstance().showPipe(showPipeStatement.getPipeName());
+    ShowPipeTask.buildTSBlock(showPipeResp.getPipeInfoList(), future);
     return future;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
index e01b159bf2..9b22eac678 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 public class CreatePipeTask implements IConfigTask {
 
-  private CreatePipeStatement createPipeStatement;
+  private final CreatePipeStatement createPipeStatement;
 
   public CreatePipeTask(CreatePipeStatement createPipeStatement) {
     this.createPipeStatement = createPipeStatement;
@@ -37,6 +37,6 @@ public class CreatePipeTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.createPipe();
+    return configTaskExecutor.createPipe(createPipeStatement);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
index b1236924e3..8a878de2dd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 public class DropPipeTask implements IConfigTask {
 
-  private DropPipeStatement dropPipeStatement;
+  private final DropPipeStatement dropPipeStatement;
 
   public DropPipeTask(DropPipeStatement dropPipeStatement) {
     this.dropPipeStatement = dropPipeStatement;
@@ -37,6 +37,6 @@ public class DropPipeTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.dropPipe();
+    return configTaskExecutor.dropPipe(dropPipeStatement);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
index 6ea56964b1..180cdd2d1d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
@@ -19,16 +19,30 @@
 
 package org.apache.iotdb.db.mpp.plan.execution.config.sys.sync;
 
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.stream.Collectors;
 
 public class ShowPipeTask implements IConfigTask {
 
-  private ShowPipeStatement showPipeStatement;
+  private final ShowPipeStatement showPipeStatement;
 
   public ShowPipeTask(ShowPipeStatement showPipeStatement) {
     this.showPipeStatement = showPipeStatement;
@@ -37,6 +51,29 @@ public class ShowPipeTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.showPipe();
+    return configTaskExecutor.showPipe(showPipeStatement);
+  }
+
+  public static void buildTSBlock(
+      List<TPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> future) {
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.showPipeColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    for (TPipeInfo tPipeInfo : pipeInfoList) {
+      builder.getTimeColumnBuilder().writeLong(0L);
+      builder
+          .getColumnBuilder(0)
+          .writeBinary(new 
Binary(DatetimeUtils.convertLongToDate(tPipeInfo.getCreateTime())));
+      builder.getColumnBuilder(1).writeBinary(new 
Binary(tPipeInfo.getPipeName()));
+      builder.getColumnBuilder(2).writeBinary(new Binary(tPipeInfo.getRole()));
+      builder.getColumnBuilder(3).writeBinary(new 
Binary(tPipeInfo.getRemote()));
+      builder.getColumnBuilder(4).writeBinary(new 
Binary(tPipeInfo.getStatus()));
+      builder.getColumnBuilder(5).writeBinary(new 
Binary(tPipeInfo.getMessage()));
+      builder.declarePosition();
+    }
+    DatasetHeader datasetHeader = DatasetHeaderFactory.getShowPipeHeader();
+    future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, 
builder.build(), datasetHeader));
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
index aa10217815..03dc2143f4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 public class StartPipeTask implements IConfigTask {
 
-  private StartPipeStatement startPipeStatement;
+  private final StartPipeStatement startPipeStatement;
 
   public StartPipeTask(StartPipeStatement startPipeStatement) {
     this.startPipeStatement = startPipeStatement;
@@ -37,6 +37,6 @@ public class StartPipeTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.startPipe();
+    return configTaskExecutor.startPipe(startPipeStatement);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
index 14ae9a1a41..e53b4f93c5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 public class StopPipeTask implements IConfigTask {
 
-  private StopPipeStatement stopPipeStatement;
+  private final StopPipeStatement stopPipeStatement;
 
   public StopPipeTask(StopPipeStatement stopPipeStatement) {
     this.stopPipeStatement = stopPipeStatement;
@@ -37,6 +37,6 @@ public class StopPipeTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.stopPipe();
+    return configTaskExecutor.stopPipe(stopPipeStatement);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 1d29cb3c6e..143c6d315d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -2823,6 +2823,8 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     }
     if (ctx.syncAttributeClauses() != null) {
       
createPipeStatement.setPipeAttributes(parseSyncAttributeClauses(ctx.syncAttributeClauses()));
+    } else {
+      createPipeStatement.setPipeAttributes(new HashMap<>());
     }
     return createPipeStatement;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
index e2763b9c6b..a3d59b52b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
@@ -20,13 +20,16 @@
 package org.apache.iotdb.db.mpp.plan.statement.sys.sync;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 
+import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -87,4 +90,39 @@ public class CreatePipeStatement extends Statement 
implements IConfigStatement {
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitCreatePipe(this, context);
   }
+
+  public static CreatePipeStatement parseString(String parsedString) throws 
IOException {
+    String[] split = 
parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    if (split.length < 4) {
+      throw new IOException("Parsing CreatePipePlan error. Attributes is less 
than expected.");
+    }
+    CreatePipeStatement statement = new 
CreatePipeStatement(StatementType.CREATE_PIPE);
+    statement.setPipeName(split[0]);
+    statement.setPipeSinkName(split[1]);
+    statement.setStartTime(Long.parseLong(split[2]));
+    int size = (Integer.parseInt(split[3]) << 1);
+    if (split.length != (size + 4)) {
+      throw new IOException("Parsing CreatePipePlan error. Attributes number 
is wrong.");
+    }
+    Map<String, String> attributes = new HashMap<>();
+    for (int i = 0; i < size; i += 2) {
+      attributes.put(split[i + 4], split[i + 5]);
+    }
+    statement.setPipeAttributes(attributes);
+    return statement;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    
builder.append(pipeName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    
builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    
builder.append(startTime).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    
builder.append(pipeAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    for (Map.Entry<String, String> entry : pipeAttributes.entrySet()) {
+      
builder.append(entry.getKey()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+      
builder.append(entry.getValue()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    }
+    return builder.toString();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java 
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 62a8699a2b..285c584228 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -28,9 +28,11 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
@@ -41,6 +43,7 @@ import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
 import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
 import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -59,6 +62,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +70,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -158,6 +163,7 @@ public class SyncService implements IService {
 
   // region Interfaces and Implementation of Pipe
 
+  // TODO(sync): delete this in new-standalone version
   public synchronized void addPipe(CreatePipePlan plan) throws PipeException {
     // check plan
     long currentTime = DatetimeUtils.currentTime();
@@ -197,6 +203,45 @@ public class SyncService implements IService {
     }
   }
 
+  public synchronized void addPipe(CreatePipeStatement statement) throws 
PipeException {
+    // check statement
+    long currentTime = DatetimeUtils.currentTime();
+    if (statement.getStartTime() > currentTime) {
+      throw new PipeException(
+          String.format(
+              "Start time %s is later than current time %s, this is not 
supported yet.",
+              DatetimeUtils.convertLongToDate(statement.getStartTime()),
+              DatetimeUtils.convertLongToDate(currentTime)));
+    }
+    // add pipe
+    TSStatus status = syncInfoFetcher.addPipe(statement, currentTime);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.message);
+    }
+
+    PipeSink runningPipeSink = getPipeSink(statement.getPipeSinkName());
+    runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(statement, 
runningPipeSink, currentTime);
+    if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
+      try {
+        senderManager = new SenderManager(runningPipe, (IoTDBPipeSink) 
runningPipeSink);
+      } catch (ClassCastException e) {
+        logger.error(
+            String.format(
+                "Cast Class to %s error when create pipe %s.",
+                IoTDBPipeSink.class.getName(), statement.getPipeName()),
+            e);
+        runningPipe = null;
+        throw new PipeException(
+            String.format(
+                "Wrong pipeSink type %s for create pipe %s",
+                runningPipeSink.getType(), runningPipeSink.getPipeSinkName()));
+      }
+    } else { // for external pipe
+      // == start ExternalPipeProcessor for send data to external pipe plugin
+      startExternalPipeManager(false);
+    }
+  }
+
   public synchronized void stopPipe(String pipeName) throws PipeException {
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
@@ -312,6 +357,40 @@ public class SyncService implements IService {
     }
   }
 
+  public List<TPipeInfo> showPipe(String pipeName) {
+    boolean showAll = StringUtils.isEmpty(pipeName);
+    List<TPipeInfo> list = new ArrayList<>();
+    // show pipe in sender
+    for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
+      if (showAll || pipeName.equals(pipe.getPipeName())) {
+        TPipeInfo tPipeInfo =
+            new TPipeInfo(
+                pipe.getCreateTime(),
+                pipe.getPipeName(),
+                SyncConstant.ROLE_SENDER,
+                pipe.getPipeSinkName(),
+                pipe.getStatus().name(),
+                "");
+        list.add(tPipeInfo);
+      }
+    }
+    // show pipe in receiver
+    for (TSyncIdentityInfo identityInfo : 
receiverManager.getAllTSyncIdentityInfos()) {
+      if (showAll || pipeName.equals(identityInfo.getPipeName())) {
+        TPipeInfo tPipeInfo =
+            new TPipeInfo(
+                identityInfo.getCreateTime(),
+                identityInfo.getPipeName(),
+                SyncConstant.ROLE_RECEIVER,
+                identityInfo.getAddress(),
+                Pipe.PipeStatus.RUNNING.name(),
+                "");
+        list.add(tPipeInfo);
+      }
+    }
+    return list;
+  }
+
   public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
     boolean showAll = "".equals(plan.getPipeName());
     // show pipe in sender
@@ -523,6 +602,22 @@ public class SyncService implements IService {
     }
   }
 
+  public List<ISyncManager> getOrCreateSyncManager(String dataRegionId) {
+    // TODO(sync): maybe add cache to accelerate
+    List<ISyncManager> syncManagerList = new ArrayList<>();
+    if (runningPipe != null) {
+      syncManagerList.add(runningPipe.getOrCreateSyncManager(dataRegionId));
+    }
+    return syncManagerList;
+  }
+
+  /** This method will be called before deleting dataRegion */
+  public void deleteSyncManager(String dataRegionId) {
+    if (runningPipe != null) {
+      runningPipe.deleteSyncManager(dataRegionId);
+    }
+  }
+
   @TestOnly
   public SenderManager getSenderManager() {
     return senderManager;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index a419d5b5c3..9e92d8360a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.common;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
@@ -45,9 +46,11 @@ public interface ISyncInfoFetcher {
 
   // region Interfaces of Pipe
 
-  // TODO: use CreatePipeNode as parameter
+  // TODO(sync): delete this in new-standalone version
   TSStatus addPipe(CreatePipePlan plan, long createTime);
 
+  TSStatus addPipe(CreatePipeStatement createPipeStatement, long createTime);
+
   TSStatus stopPipe(String pipeName);
 
   TSStatus startPipe(String pipeName);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 7f10723153..9ba37501e0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -22,8 +22,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
@@ -102,10 +103,20 @@ public class LocalSyncInfoFetcher implements 
ISyncInfoFetcher {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  @Override
+  public TSStatus addPipe(CreatePipeStatement createPipeStatement, long 
createTime) {
+    try {
+      syncInfo.addPipe(createPipeStatement, createTime);
+    } catch (PipeException | IOException e) {
+      RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
   @Override
   public TSStatus stopPipe(String pipeName) {
     try {
-      syncInfo.operatePipe(pipeName, Operator.OperatorType.STOP_PIPE);
+      syncInfo.operatePipe(pipeName, StatementType.STOP_PIPE);
     } catch (PipeException | IOException e) {
       RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     }
@@ -115,7 +126,7 @@ public class LocalSyncInfoFetcher implements 
ISyncInfoFetcher {
   @Override
   public TSStatus startPipe(String pipeName) {
     try {
-      syncInfo.operatePipe(pipeName, Operator.OperatorType.START_PIPE);
+      syncInfo.operatePipe(pipeName, StatementType.START_PIPE);
     } catch (PipeException | IOException e) {
       RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     }
@@ -125,7 +136,7 @@ public class LocalSyncInfoFetcher implements 
ISyncInfoFetcher {
   @Override
   public TSStatus dropPipe(String pipeName) {
     try {
-      syncInfo.operatePipe(pipeName, Operator.OperatorType.DROP_PIPE);
+      syncInfo.operatePipe(pipeName, StatementType.DROP_PIPE);
     } catch (PipeException | IOException e) {
       RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java 
b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 218e0994e4..560605b5e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -22,8 +22,9 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
@@ -144,7 +145,7 @@ public class SyncInfo {
   // endregion
 
   // region Implement of Pipe
-
+  // TODO: delete this in new-standalone version
   public void addPipe(CreatePipePlan plan, long createTime) throws 
PipeException, IOException {
     // common check
     if (runningPipe != null && runningPipe.getStatus() != 
Pipe.PipeStatus.DROP) {
@@ -163,10 +164,32 @@ public class SyncInfo {
     syncLogWriter.addPipe(plan, createTime);
   }
 
-  public void operatePipe(String pipeName, Operator.OperatorType operatorType)
+  public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
+      throws PipeException, IOException {
+    // common check
+    if (runningPipe != null && runningPipe.getStatus() != 
Pipe.PipeStatus.DROP) {
+      throw new PipeException(
+          String.format(
+              "Pipe %s is %s, please retry after drop it.",
+              runningPipe.getPipeName(), runningPipe.getStatus().name()));
+    }
+    if (!isPipeSinkExist(createPipeStatement.getPipeSinkName())) {
+      throw new PipeException(
+          String.format("Can not find pipeSink %s.", 
createPipeStatement.getPipeSinkName()));
+    }
+
+    PipeSink runningPipeSink = 
getPipeSink(createPipeStatement.getPipeSinkName());
+    runningPipe =
+        SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
+            createPipeStatement, runningPipeSink, createTime);
+    pipes.add(runningPipe);
+    syncLogWriter.addPipe(createPipeStatement, createTime);
+  }
+
+  public void operatePipe(String pipeName, StatementType statementType)
       throws PipeException, IOException {
     checkIfPipeExistAndRunning(pipeName);
-    switch (operatorType) {
+    switch (statementType) {
       case START_PIPE:
         runningPipe.start();
         break;
@@ -177,9 +200,9 @@ public class SyncInfo {
         runningPipe.drop();
         break;
       default:
-        throw new PipeException("Unknown operatorType " + operatorType);
+        throw new PipeException("Unknown operatorType " + statementType);
     }
-    syncLogWriter.operatePipe(pipeName, operatorType);
+    syncLogWriter.operatePipe(pipeName, statementType);
   }
 
   public List<PipeInfo> getAllPipeInfos() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index a9e91c09c7..344b4b8f08 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
 import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
@@ -128,11 +128,11 @@ public class SyncLogReader {
           case CREATE_PIPE:
             readLine = br.readLine();
             lineNumber += 1;
-            CreatePipePlan pipePlan = CreatePipePlan.parseString(readLine);
+            CreatePipeStatement createPipeStatement = 
CreatePipeStatement.parseString(readLine);
             runningPipe =
                 SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
-                    pipePlan,
-                    pipeSinks.get(pipePlan.getPipeSinkName()),
+                    createPipeStatement,
+                    pipeSinks.get(createPipeStatement.getPipeSinkName()),
                     Long.parseLong(parseStrings[1]));
             pipes.add(runningPipe);
             break;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
index 01e3068f15..556e85f337 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.sync.common.persistence;
 
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
@@ -55,6 +57,7 @@ public class SyncLogWriter {
     }
   }
 
+  // TODO(sync): delete this in new-standalone version
   public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws 
IOException {
     getBufferedWriter();
     pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
@@ -83,6 +86,7 @@ public class SyncLogWriter {
     pipeInfoWriter.flush();
   }
 
+  // TODO(sync): delete this in new-standalone version
   public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime) 
throws IOException {
     getBufferedWriter();
     pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPE.name());
@@ -94,9 +98,20 @@ public class SyncLogWriter {
     pipeInfoWriter.flush();
   }
 
-  public synchronized void operatePipe(String pipeName, Operator.OperatorType 
type)
+  public synchronized void addPipe(CreatePipeStatement createPipeStatement, 
long pipeCreateTime)
       throws IOException {
     getBufferedWriter();
+    pipeInfoWriter.write(createPipeStatement.getType().name());
+    pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
+    pipeInfoWriter.write(String.valueOf(pipeCreateTime));
+    pipeInfoWriter.newLine();
+    pipeInfoWriter.write(createPipeStatement.toString());
+    pipeInfoWriter.newLine();
+    pipeInfoWriter.flush();
+  }
+
+  public synchronized void operatePipe(String pipeName, StatementType type) 
throws IOException {
+    getBufferedWriter();
     pipeInfoWriter.write(type.name());
     pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
     pipeInfoWriter.write(pipeName);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index c46e66a095..70746879f2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -18,21 +18,33 @@
  */
 package org.apache.iotdb.db.sync.receiver.load;
 
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
-import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
 
 /** This loader is used to load tsFiles. If .mods file exists, it will be 
loaded as well. */
 public class TsFileLoader implements ILoader {
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileLoader.class);
+  private static PlanExecutor planExecutor;
 
-  private File tsFile;
+  static {
+    try {
+      planExecutor = new PlanExecutor();
+    } catch (QueryProcessException e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  private final File tsFile;
 
   public TsFileLoader(File tsFile) {
     this.tsFile = tsFile;
@@ -41,24 +53,14 @@ public class TsFileLoader implements ILoader {
   @Override
   public void load() throws PipeDataLoadException {
     try {
-      TsFileResource tsFileResource = new TsFileResource(tsFile);
-      tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
-      FileLoaderUtils.loadOrGenerateResource(tsFileResource);
-      List<TsFileResource> splitResources = new ArrayList();
-      if (tsFileResource.isSpanMultiTimePartitions()) {
-        TsFileSplitByPartitionTool.rewriteTsFile(tsFileResource, 
splitResources);
-        tsFileResource.writeLock();
-        tsFileResource.removeModFile();
-        tsFileResource.writeUnlock();
-      }
-
-      if (splitResources.isEmpty()) {
-        splitResources.add(tsFileResource);
-      }
-
-      for (TsFileResource resource : splitResources) {
-        StorageEngine.getInstance().loadNewTsFile(resource, false);
-      }
+      PhysicalPlan plan =
+          new OperateFilePlan(
+              tsFile,
+              Operator.OperatorType.LOAD_FILES,
+              true,
+              
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(),
+              true);
+      planExecutor.processNonQuery(plan);
     } catch (Exception e) {
       throw new PipeDataLoadException(e.getMessage());
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/ISyncManager.java
similarity index 50%
copy from 
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
copy to 
server/src/main/java/org/apache/iotdb/db/sync/sender/manager/ISyncManager.java
index 14ae9a1a41..33bdf79f33 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/ISyncManager.java
@@ -16,27 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.sync.sender.manager;
 
-package org.apache.iotdb.db.mpp.plan.execution.config.sys.sync;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 
-import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
-import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
-import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
+import java.io.File;
+import java.util.List;
 
-import com.google.common.util.concurrent.ListenableFuture;
+/**
+ * ISyncManager is designed for collect all history TsFiles(i.e. before the 
pipe start time, all
+ * tsfiles whose memtable is set to null.) and realtime TsFiles for registered 
{@link TsFilePipe}.
+ */
+public interface ISyncManager {
+  /** tsfile */
+  void syncRealTimeDeletion(Deletion deletion);
+
+  void syncRealTimeTsFile(File tsFile);
 
-public class StopPipeTask implements IConfigTask {
+  void syncRealTimeResource(File tsFile);
 
-  private StopPipeStatement stopPipeStatement;
+  List<File> syncHistoryTsFile(long dataStartTime);
 
-  public StopPipeTask(StopPipeStatement stopPipeStatement) {
-    this.stopPipeStatement = stopPipeStatement;
-  }
+  File createHardlink(File tsFile, long modsOffset);
 
-  @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
-      throws InterruptedException {
-    return configTaskExecutor.stopPipe();
-  }
+  void delete();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/LocalSyncManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/LocalSyncManager.java
new file mode 100644
index 0000000000..b366ab3e2a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/LocalSyncManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.manager;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class LocalSyncManager implements ISyncManager {
+
+  private TsFilePipe syncPipe;
+  private final DataRegion dataRegion;
+
+  public LocalSyncManager(DataRegion dataRegion, Pipe pipe) {
+    this.dataRegion = dataRegion;
+    this.syncPipe = (TsFilePipe) pipe;
+  }
+
+  /** tsfile */
+  @Override
+  public void syncRealTimeDeletion(Deletion deletion) {
+    syncPipe.collectRealTimeDeletion(deletion, 
dataRegion.getStorageGroupName());
+  }
+
+  @Override
+  public void syncRealTimeTsFile(File tsFile) {
+    syncPipe.collectRealTimeTsFile(tsFile);
+  }
+
+  @Override
+  public void syncRealTimeResource(File tsFile) {
+    syncPipe.collectRealTimeResource(tsFile);
+  }
+
+  @Override
+  public List<File> syncHistoryTsFile(long dataStartTime) {
+    return new ArrayList<>(this.dataRegion.collectHistoryTsFileForSync(this, 
dataStartTime));
+  }
+
+  @Override
+  public File createHardlink(File tsFile, long modsOffset) {
+    return syncPipe.createHistoryTsFileHardlink(tsFile, modsOffset);
+  }
+
+  @Override
+  public void delete() {
+    // TODO(sync): parse to delete operation and sync
+    // 1、get timeseries
+    // 2、get time partition
+    //    syncPipe.collectRealTimeDeletion();
+  }
+
+  public static List<PartialPath> splitPathPatternByDevice(PartialPath 
pathPattern)
+      throws MetadataException {
+    Set<PartialPath> devices = 
LocalSchemaProcessor.getInstance().getBelongedDevices(pathPattern);
+    List<PartialPath> resultPathPattern = new LinkedList<>();
+    for (PartialPath device : devices) {
+      pathPattern.alterPrefixPath(device).stream()
+          .filter(i -> !i.equals(device))
+          .forEach(resultPathPattern::add);
+    }
+    return resultPathPattern;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
deleted file mode 100644
index f04ea874a1..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender.manager;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
-
-/**
- * SchemaSyncManager is designed to collect history metadata(i.e. all storage 
group and all
- * timeseries in IoTDB), and realtime metadata(i.e. create storage group, 
timeseries and delete
- * timeseries operations) for all registered {@linkplain TsFilePipe}.
- */
-public class SchemaSyncManager {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SchemaSyncManager.class);
-
-  private TsFilePipe syncPipe = null;
-
-  private static class SchemaSyncManagerHolder {
-
-    private SchemaSyncManagerHolder() {
-      // allowed to do nothing
-    }
-
-    private static final SchemaSyncManager INSTANCE = new SchemaSyncManager();
-  }
-
-  public static SchemaSyncManager getInstance() {
-    return SchemaSyncManagerHolder.INSTANCE;
-  }
-
-  public void registerSyncTask(TsFilePipe syncPipe) {
-    this.syncPipe = syncPipe;
-  }
-
-  public void deregisterSyncTask() {
-    this.syncPipe = null;
-  }
-
-  public boolean isEnableSync() {
-    return syncPipe != null;
-  }
-
-  public void syncMetadataPlan(PhysicalPlan plan) {
-    syncPipe.collectRealTimeMetaData(plan);
-  }
-
-  public void clear() {
-    this.syncPipe = null;
-  }
-
-  /** only support for SchemaRegion */
-  public List<PhysicalPlan> collectHistoryMetadata() {
-    List<PhysicalPlan> historyMetadata = new ArrayList<>();
-    List<SetStorageGroupPlan> storageGroupPlanList = getStorageGroupAsPlan();
-    for (SetStorageGroupPlan storageGroupPlan : storageGroupPlanList) {
-      historyMetadata.add(storageGroupPlan);
-    }
-
-    for (ISchemaRegion schemaRegion : 
SchemaEngine.getInstance().getAllSchemaRegions()) {
-      try {
-        for (MeasurementPath measurementPath :
-            schemaRegion.getMeasurementPaths(new 
PartialPath(ALL_RESULT_NODES), false)) {
-          if (measurementPath.isUnderAlignedEntity()) {
-            historyMetadata.add(
-                new CreateAlignedTimeSeriesPlan(
-                    measurementPath.getDevicePath(),
-                    measurementPath.getMeasurement(),
-                    (MeasurementSchema) 
measurementPath.getMeasurementSchema()));
-          } else {
-            historyMetadata.add(
-                new CreateTimeSeriesPlan(
-                    measurementPath, (MeasurementSchema) 
measurementPath.getMeasurementSchema()));
-          }
-        }
-      } catch (MetadataException e) {
-        logger.warn(
-            String.format(
-                "Collect history schema from schemaRegion: %s of sg %s error. 
Skip this schemaRegion.",
-                schemaRegion.getSchemaRegionId(), 
schemaRegion.getStorageGroupFullPath()));
-      }
-    }
-
-    return historyMetadata;
-  }
-
-  private List<SetStorageGroupPlan> getStorageGroupAsPlan() {
-    List<PartialPath> allStorageGroups = 
IoTDB.configManager.getAllStorageGroupPaths();
-    List<SetStorageGroupPlan> result = new LinkedList<>();
-    for (PartialPath sgPath : allStorageGroups) {
-      result.add(new SetStorageGroupPlan(sgPath));
-    }
-    return result;
-  }
-
-  public DeleteTimeSeriesPlan splitDeleteTimeseriesPlanByDevice(PartialPath 
pathPattern)
-      throws MetadataException {
-    return new DeleteTimeSeriesPlan(splitPathPatternByDevice(pathPattern));
-  }
-
-  public List<PartialPath> splitPathPatternByDevice(PartialPath pathPattern)
-      throws MetadataException {
-    Set<PartialPath> devices = 
IoTDB.schemaProcessor.getBelongedDevices(pathPattern);
-    List<PartialPath> resultPathPattern = new LinkedList<>();
-    for (PartialPath device : devices) {
-      pathPattern.alterPrefixPath(device).stream()
-          .filter(i -> !i.equals(device))
-          .forEach(resultPathPattern::add);
-    }
-    return resultPathPattern;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
deleted file mode 100644
index b68868167d..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender.manager;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.storagegroup.dataregion.StorageGroupManager;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * TsFileSyncManager is designed for collect all history TsFiles(i.e. before 
the pipe start time,
- * all tsfiles whose memtable is set to null.), and realtime tsfiles for 
registered {@linkplain
- * TsFilePipe}.
- */
-public class TsFileSyncManager {
-  private static final Logger logger = 
LoggerFactory.getLogger(TsFileSyncManager.class);
-
-  private TsFilePipe syncPipe;
-
-  /** singleton */
-  private TsFileSyncManager() {}
-
-  private static class TsFileSyncManagerHolder {
-    private static final TsFileSyncManager INSTANCE = new TsFileSyncManager();
-
-    private TsFileSyncManagerHolder() {}
-  }
-
-  public static TsFileSyncManager getInstance() {
-    return TsFileSyncManager.TsFileSyncManagerHolder.INSTANCE;
-  }
-
-  /** register */
-  public void registerSyncTask(TsFilePipe syncPipe) {
-    this.syncPipe = syncPipe;
-  }
-
-  public void deregisterSyncTask() {
-    this.syncPipe = null;
-  }
-
-  public boolean isEnableSync() {
-    return syncPipe != null;
-  }
-
-  public void clear() {
-    syncPipe = null;
-  }
-
-  /** tsfile */
-  public void collectRealTimeDeletion(Deletion deletion, String sgName) {
-    syncPipe.collectRealTimeDeletion(deletion, sgName);
-  }
-
-  public void collectRealTimeTsFile(File tsFile) {
-    syncPipe.collectRealTimeTsFile(tsFile);
-  }
-
-  public void collectRealTimeResource(File tsFile) {
-    syncPipe.collectRealTimeResource(tsFile);
-  }
-
-  public List<File> registerAndCollectHistoryTsFile(TsFilePipe syncPipe, long 
dataStartTime) {
-    registerSyncTask(syncPipe);
-
-    List<File> historyTsFiles = new ArrayList<>();
-    Iterator<Map.Entry<PartialPath, StorageGroupManager>> sgIterator =
-        StorageEngine.getInstance().getProcessorMap().entrySet().iterator();
-    while (sgIterator.hasNext()) {
-      historyTsFiles.addAll(
-          
sgIterator.next().getValue().collectHistoryTsFileForSync(dataStartTime));
-    }
-
-    return historyTsFiles;
-  }
-
-  public File createHardlink(File tsFile, long modsOffset) {
-    return syncPipe.createHistoryTsFileHardlink(tsFile, modsOffset);
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index f1afd9345a..773e591110 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
 import org.apache.iotdb.db.sync.transport.client.ISyncClient;
 
 /**
@@ -107,6 +108,17 @@ public interface Pipe {
    */
   void commit();
 
+  /**
+   * Get {@linkplain ISyncManager} by dataRegionId. If ISyncManager does not 
exist, it will be
+   * created automatically.
+   *
+   * @param dataRegionId string of {@linkplain 
org.apache.iotdb.commons.consensus.DataRegionId}
+   * @return ISyncManager
+   */
+  ISyncManager getOrCreateSyncManager(String dataRegionId);
+
+  void deleteSyncManager(String dataRegionId);
+
   // a new pipe should be stop status
   enum PipeStatus {
     RUNNING,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 55f0551000..03d1bb89d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -19,19 +19,20 @@
  */
 package org.apache.iotdb.db.sync.sender.pipe;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
+import org.apache.iotdb.db.sync.sender.manager.LocalSyncManager;
 import org.apache.iotdb.db.sync.sender.recovery.TsFilePipeLogger;
 
 import org.slf4j.Logger;
@@ -41,13 +42,15 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class TsFilePipe implements Pipe {
   private static final Logger logger = 
LoggerFactory.getLogger(TsFilePipe.class);
-  private final SchemaSyncManager schemaSyncManager = 
SchemaSyncManager.getInstance();
-  private final TsFileSyncManager tsFileSyncManager = 
TsFileSyncManager.getInstance();
+  // <dataNodeId, ISyncManager>
+  private final Map<String, ISyncManager> syncManagerMap = new 
ConcurrentHashMap<>();
 
   private final long createTime;
   private final String name;
@@ -60,7 +63,6 @@ public class TsFilePipe implements Pipe {
   private final TsFilePipeLogger pipeLog;
   private final ReentrantLock collectRealTimeDataLock;
 
-  private boolean isCollectingRealTimeData;
   private long maxSerialNumber;
 
   private PipeStatus status;
@@ -80,7 +82,6 @@ public class TsFilePipe implements Pipe {
     this.pipeLog = new TsFilePipeLogger(this);
     this.collectRealTimeDataLock = new ReentrantLock();
 
-    this.isCollectingRealTimeData = false;
     this.maxSerialNumber = Math.max(0L, 
realTimeQueue.getLastMaxSerialNumber());
 
     this.status = PipeStatus.STOP;
@@ -95,42 +96,40 @@ public class TsFilePipe implements Pipe {
       return;
     }
 
+    // init sync manager
+    List<DataRegion> dataRegions = 
StorageEngineV2.getInstance().getAllDataRegions();
+    for (DataRegion dataRegion : dataRegions) {
+      logger.info(
+          logFormat(
+              "init syncManager for %s-%s",
+              dataRegion.getStorageGroupName(), dataRegion.getDataRegionId()));
+      syncManagerMap.put(dataRegion.getDataRegionId(), new 
LocalSyncManager(dataRegion, this));
+    }
     try {
       if (!pipeLog.isCollectFinished()) {
         pipeLog.clear();
-        collectData();
+        collectHistoryData();
         pipeLog.finishCollect();
       }
-      if (!isCollectingRealTimeData) {
-        registerMetadata();
-        registerTsFile();
-        isCollectingRealTimeData = true;
-      }
 
       status = PipeStatus.RUNNING;
     } catch (IOException e) {
       logger.error(
-          String.format(
-              "Clear pipe dir %s error.", SyncPathUtil.getSenderPipeDir(name, 
createTime)),
+          logFormat("Clear pipe dir %s error.", 
SyncPathUtil.getSenderPipeDir(name, createTime)),
           e);
       throw new PipeException("Start error, can not clear pipe log.");
     }
   }
 
   /** collect data * */
-  private void collectData() {
-    registerMetadata();
-    List<PhysicalPlan> historyMetadata = collectHistoryMetadata();
-    List<File> historyTsFiles = registerAndCollectHistoryTsFile();
-    isCollectingRealTimeData = true;
-
-    // get all history data
-    int historyMetadataSize = historyMetadata.size();
-    int historyTsFilesSize = historyTsFiles.size();
-    for (int i = 0; i < historyMetadataSize; i++) {
-      long serialNumber = 1 - historyTsFilesSize - historyMetadataSize + i;
-      historyQueue.offer(new SchemaPipeData(historyMetadata.get(i), 
serialNumber));
+  private void collectHistoryData() {
+    // collect history TsFile
+    List<File> historyTsFiles = new ArrayList<>();
+    for (ISyncManager syncManager : syncManagerMap.values()) {
+      historyTsFiles.addAll(syncManager.syncHistoryTsFile(dataStartTime));
     }
+    // put history data into PipeDataQueue
+    int historyTsFilesSize = historyTsFiles.size();
     for (int i = 0; i < historyTsFilesSize; i++) {
       long serialNumber = 1 - historyTsFilesSize + i;
       File tsFile = historyTsFiles.get(i);
@@ -138,41 +137,6 @@ public class TsFilePipe implements Pipe {
     }
   }
 
-  private void registerMetadata() {
-    schemaSyncManager.registerSyncTask(this);
-  }
-
-  private void deregisterMetadata() {
-    schemaSyncManager.deregisterSyncTask();
-  }
-
-  private List<PhysicalPlan> collectHistoryMetadata() {
-    return schemaSyncManager.collectHistoryMetadata();
-  }
-
-  public void collectRealTimeMetaData(PhysicalPlan plan) {
-    collectRealTimeDataLock.lock();
-    try {
-      maxSerialNumber += 1L;
-      PipeData metaData = new SchemaPipeData(plan, maxSerialNumber);
-      realTimeQueue.offer(metaData);
-    } finally {
-      collectRealTimeDataLock.unlock();
-    }
-  }
-
-  private void registerTsFile() {
-    tsFileSyncManager.registerSyncTask(this);
-  }
-
-  private void deregisterTsFile() {
-    tsFileSyncManager.deregisterSyncTask();
-  }
-
-  private List<File> registerAndCollectHistoryTsFile() {
-    return tsFileSyncManager.registerAndCollectHistoryTsFile(this, 
dataStartTime);
-  }
-
   public File createHistoryTsFileHardlink(File tsFile, long modsOffset) {
     collectRealTimeDataLock.lock(); // synchronize the pipeLog.isHardlinkExist
     try {
@@ -182,8 +146,7 @@ public class TsFilePipe implements Pipe {
 
       return pipeLog.createTsFileAndModsHardlink(tsFile, modsOffset);
     } catch (IOException e) {
-      logger.error(
-          String.format("Create hardlink for history tsfile %s error.", 
tsFile.getPath()), e);
+      logger.error(logFormat("Create hardlink for history tsfile %s error.", 
tsFile.getPath()), e);
       return null;
     } finally {
       collectRealTimeDataLock.unlock();
@@ -197,8 +160,7 @@ public class TsFilePipe implements Pipe {
         return;
       }
 
-      for (PartialPath deletePath :
-          schemaSyncManager.splitPathPatternByDevice(deletion.getPath())) {
+      for (PartialPath deletePath : 
LocalSyncManager.splitPathPatternByDevice(deletion.getPath())) {
         Deletion splitDeletion =
             new Deletion(
                 deletePath,
@@ -210,7 +172,7 @@ public class TsFilePipe implements Pipe {
         realTimeQueue.offer(deletionData);
       }
     } catch (MetadataException e) {
-      logger.warn(String.format("Collect deletion %s error.", deletion), e);
+      logger.warn(logFormat("Collect deletion %s error.", deletion), e);
     } finally {
       collectRealTimeDataLock.unlock();
     }
@@ -230,7 +192,7 @@ public class TsFilePipe implements Pipe {
       realTimeQueue.offer(tsFileData);
     } catch (IOException e) {
       logger.warn(
-          String.format(
+          logFormat(
               "Create Hardlink tsfile %s on disk error, serial number is %d.",
               tsFile.getPath(), maxSerialNumber),
           e);
@@ -243,13 +205,14 @@ public class TsFilePipe implements Pipe {
     try {
       pipeLog.createTsFileResourceHardlink(tsFile);
     } catch (IOException e) {
-      logger.warn(String.format("Record tsfile resource %s on disk error.", 
tsFile.getPath()), e);
+      logger.warn(logFormat("Record tsfile resource %s on disk error.", 
tsFile.getPath()), e);
     }
   }
 
   /** transport data * */
   @Override
   public PipeData take() throws InterruptedException {
+    // TODO:should judge isCollectingRealTimeData here
     if (!historyQueue.isEmpty()) {
       return historyQueue.take();
     }
@@ -275,6 +238,23 @@ public class TsFilePipe implements Pipe {
     realTimeQueue.commit();
   }
 
+  @Override
+  public ISyncManager getOrCreateSyncManager(String dataRegionId) {
+    return syncManagerMap.computeIfAbsent(
+        dataRegionId,
+        id ->
+            new LocalSyncManager(
+                StorageEngineV2.getInstance().getDataRegion(new 
DataRegionId(Integer.parseInt(id))),
+                this));
+  }
+
+  @Override
+  public void deleteSyncManager(String dataRegionId) {
+    if (syncManagerMap.containsKey(dataRegionId)) {
+      syncManagerMap.remove(dataRegionId).delete();
+    }
+  }
+
   public void commit(long serialNumber) {
     if (!historyQueue.isEmpty()) {
       historyQueue.commit(serialNumber);
@@ -290,13 +270,6 @@ public class TsFilePipe implements Pipe {
       throw new PipeException(
           String.format("Can not stop pipe %s, because the pipe is drop.", 
name));
     }
-
-    if (!isCollectingRealTimeData) {
-      registerMetadata();
-      registerTsFile();
-      isCollectingRealTimeData = true;
-    }
-
     status = PipeStatus.STOP;
   }
 
@@ -311,29 +284,24 @@ public class TsFilePipe implements Pipe {
   }
 
   private void clear() {
-    deregisterMetadata();
-    deregisterTsFile();
-    isCollectingRealTimeData = false;
-
     try {
       historyQueue.clear();
       realTimeQueue.clear();
       pipeLog.clear();
     } catch (IOException e) {
-      logger.warn(String.format("Clear pipe %s %d error.", name, createTime), 
e);
+      logger.warn(logFormat("Clear pipe %s %d error.", name, createTime), e);
     }
   }
 
+  private String logFormat(String format, Object... arguments) {
+    return String.format(String.format("[%s-%s] ", this.name, this.createTime) 
+ format, arguments);
+  }
+
   @Override
   public void close() throws PipeException {
     if (status == PipeStatus.DROP) {
       return;
     }
-
-    deregisterMetadata();
-    deregisterTsFile();
-    isCollectingRealTimeData = false;
-
     historyQueue.close();
     realTimeQueue.close();
   }
@@ -374,8 +342,6 @@ public class TsFilePipe implements Pipe {
         + syncDelOp
         + ", pipeLog="
         + pipeLog
-        + ", isCollectingRealTimeData="
-        + isCollectingRealTimeData
         + ", maxSerialNumber="
         + maxSerialNumber
         + ", status="
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
index 197946012f..3705e5b1ff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
@@ -96,7 +96,6 @@ public class SenderManager {
                       PipeMessage.MsgType.WARN,
                       String.format(
                           "Transfer piepdata %s error, skip it.", 
pipeData.getSerialNumber()));
-              continue;
             }
             pipe.commit();
           }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java 
b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index 145130c5ee..704600a436 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.sync;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -30,6 +31,8 @@ import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipeInfo;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.util.Map;
+
 public class SyncPipeUtil {
 
   // TODO(sync): delete this in new-standalone version
@@ -61,6 +64,7 @@ public class SyncPipeUtil {
     return pipeSink;
   }
 
+  // TODO(sync): delete this in new-standalone version
   public static Pipe parseCreatePipePlanAsPipe(
       CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws 
PipeException {
     boolean syncDelOp = true;
@@ -77,6 +81,28 @@ public class SyncPipeUtil {
         pipeCreateTime, plan.getPipeName(), pipeSink, 
plan.getDataStartTimestamp(), syncDelOp);
   }
 
+  public static Pipe parseCreatePipePlanAsPipe(
+      CreatePipeStatement createPipeStatement, PipeSink pipeSink, long 
pipeCreateTime)
+      throws PipeException {
+    boolean syncDelOp = true;
+    for (Map.Entry<String, String> entry : 
createPipeStatement.getPipeAttributes().entrySet()) {
+      String attributeKey = entry.getKey().toLowerCase();
+      if ("syncdelop".equals(attributeKey)) {
+        syncDelOp = Boolean.parseBoolean(entry.getValue());
+      } else {
+        throw new PipeException(String.format("Can not recognition attribute 
%s", entry.getKey()));
+      }
+    }
+
+    return new TsFilePipe(
+        pipeCreateTime,
+        createPipeStatement.getPipeName(),
+        pipeSink,
+        createPipeStatement.getStartTime(),
+        syncDelOp);
+  }
+
+  // TODO(sync): delete this in new-standalone version
   public static PipeInfo parseCreatePipePlanAsPipeInfo(
       CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws 
PipeException {
     boolean syncDelOp = true;
@@ -97,6 +123,27 @@ public class SyncPipeUtil {
         syncDelOp);
   }
 
+  public static PipeInfo parseCreatePipePlanAsPipeInfo(
+      CreatePipeStatement createPipeStatement, PipeSink pipeSink, long 
pipeCreateTime)
+      throws PipeException {
+    boolean syncDelOp = true;
+    for (Map.Entry<String, String> entry : 
createPipeStatement.getPipeAttributes().entrySet()) {
+      String attributeKey = entry.getKey().toLowerCase();
+      if ("syncdelop".equals(attributeKey)) {
+        syncDelOp = Boolean.parseBoolean(entry.getValue());
+      } else {
+        throw new PipeException(String.format("Can not recognition attribute 
%s", entry.getKey()));
+      }
+    }
+
+    return new TsFilePipeInfo(
+        createPipeStatement.getPipeName(),
+        pipeSink.getPipeSinkName(),
+        pipeCreateTime,
+        createPipeStatement.getStartTime(),
+        syncDelOp);
+  }
+
   /** parse PipeInfo ass Pipe, ignore status */
   public static Pipe parsePipeInfoAsPipe(PipeInfo pipeInfo, PipeSink pipeSink)
       throws PipeException {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index b7c8ed1117..5b102d45e3 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -71,7 +71,8 @@ public class TsFileProcessorTest {
 
   private TsFileProcessor processor;
   private String storageGroup = "root.vehicle";
-  private StorageGroupInfo sgInfo = new StorageGroupInfo(null);
+  private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+  private StorageGroupInfo sgInfo;
   private String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 
0, 0);
   private String deviceId = "root.vehicle.d0";
   private String measurementId = "s0";
@@ -82,12 +83,13 @@ public class TsFileProcessorTest {
   private static Logger logger = 
LoggerFactory.getLogger(TsFileProcessorTest.class);
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     File file = new File(filePath);
     if (!file.getParentFile().exists()) {
       Assert.assertTrue(file.getParentFile().mkdirs());
     }
     EnvironmentUtils.envSetUp();
+    sgInfo = new StorageGroupInfo(new 
DataRegionTest.DummyDataRegion(systemDir, storageGroup));
     MetadataManagerHelper.initMetadata();
     context = EnvironmentUtils.TEST_QUERY_CONTEXT;
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
index 6ea46e968d..7ac36ca6c1 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.sync.receiver.manager;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.SyncInfo;
@@ -72,10 +72,10 @@ public class SyncInfoTest {
       } catch (PipeException e) {
         // throw exception because only one pipe is allowed now
       }
-      syncInfo.operatePipe(pipe1, Operator.OperatorType.DROP_PIPE);
+      syncInfo.operatePipe(pipe1, StatementType.DROP_PIPE);
       syncInfo.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
-      syncInfo.operatePipe(pipe2, Operator.OperatorType.STOP_PIPE);
-      syncInfo.operatePipe(pipe2, Operator.OperatorType.START_PIPE);
+      syncInfo.operatePipe(pipe2, StatementType.STOP_PIPE);
+      syncInfo.operatePipe(pipe2, StatementType.START_PIPE);
       Assert.assertEquals(2, syncInfo.getAllPipeInfos().size());
       Assert.assertEquals(1, syncInfo.getAllPipeSink().size());
       PipeMessage info = new PipeMessage(PipeMessage.MsgType.INFO, "info");
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index 26919b948e..d072fe166e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.sync.receiver.recovery;
 
 import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
@@ -69,11 +69,11 @@ public class SyncLogTest {
       createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
       log.addPipeSink(createPipeSinkPlan);
       log.addPipe(new CreatePipePlan(pipe1, "demo"), 1);
-      log.operatePipe(pipe1, Operator.OperatorType.DROP_PIPE);
+      log.operatePipe(pipe1, StatementType.DROP_PIPE);
 
       log.addPipe(new CreatePipePlan(pipe2, "demo"), 2);
-      log.operatePipe(pipe1, Operator.OperatorType.STOP_PIPE);
-      log.operatePipe(pipe1, Operator.OperatorType.START_PIPE);
+      log.operatePipe(pipe1, StatementType.STOP_PIPE);
+      log.operatePipe(pipe1, StatementType.START_PIPE);
       log.close();
       SyncLogReader syncLogReader = new SyncLogReader();
       syncLogReader.recover();
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 3149dd0ba0..e177f8bcac 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -364,6 +364,21 @@ struct TGetPathsSetTemplatesResp {
   2: optional list<string> pathList
 }
 
+// Show pipe
+struct TPipeInfo {
+  1: required i64 createTime
+  2: required string pipeName
+  3: required string role
+  4: required string remote
+  5: required string status
+  6: required string message
+}
+
+struct TShowPipeResp {
+  1: required common.TSStatus status
+  2: optional list<TPipeInfo> pipeInfoList
+}
+
 service IConfigNodeRPCService {
 
   // ======================================================

Reply via email to