This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 94500581f6 [IOTDB-3224][IOTDB-3949] Sync pipe execution and data
collection process in standalone version (#7154)
94500581f6 is described below
commit 94500581f6ceb8939287c184fb992d39aa4fd55d
Author: Chen YZ <[email protected]>
AuthorDate: Mon Sep 5 15:35:56 2022 +0800
[IOTDB-3224][IOTDB-3949] Sync pipe execution and data collection process in
standalone version (#7154)
---
.../{IoTDBPipeSinkIT.java => IoTDBPipeIT.java} | 82 ++++++++----
.../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java | 4 +-
.../db/integration/sync/IoTDBSyncSenderIT.java | 1 +
.../apache/iotdb/commons/sync/SyncConstant.java | 4 +-
.../apache/iotdb/db/engine/StorageEngineV2.java | 6 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 21 +--
.../db/engine/storagegroup/TsFileManager.java | 14 +-
.../db/engine/storagegroup/TsFileProcessor.java | 24 ++--
.../dataregion/StorageGroupManager.java | 10 --
.../iotdb/db/localconfignode/LocalConfigNode.java | 65 ++++++---
.../schemaregion/SchemaRegionMemoryImpl.java | 11 --
.../schemaregion/SchemaRegionSchemaFileImpl.java | 11 --
.../db/mpp/common/header/ColumnHeaderConstant.java | 17 +++
.../db/mpp/common/header/DatasetHeaderFactory.java | 4 +
.../config/executor/ClusterConfigTaskExecutor.java | 15 ++-
.../config/executor/IConfigTaskExecutor.java | 15 ++-
.../executor/StandaloneConfigTaskExecutor.java | 74 +++++-----
.../execution/config/sys/sync/CreatePipeTask.java | 4 +-
.../execution/config/sys/sync/DropPipeTask.java | 4 +-
.../execution/config/sys/sync/ShowPipeTask.java | 41 +++++-
.../execution/config/sys/sync/StartPipeTask.java | 4 +-
.../execution/config/sys/sync/StopPipeTask.java | 4 +-
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 2 +
.../statement/sys/sync/CreatePipeStatement.java | 38 ++++++
.../java/org/apache/iotdb/db/sync/SyncService.java | 95 +++++++++++++
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 5 +-
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 19 ++-
.../org/apache/iotdb/db/sync/common/SyncInfo.java | 35 ++++-
.../db/sync/common/persistence/SyncLogReader.java | 8 +-
.../db/sync/common/persistence/SyncLogWriter.java | 17 ++-
.../iotdb/db/sync/receiver/load/TsFileLoader.java | 54 ++++----
.../sender/manager/ISyncManager.java} | 34 ++---
.../db/sync/sender/manager/LocalSyncManager.java | 90 +++++++++++++
.../db/sync/sender/manager/SchemaSyncManager.java | 149 ---------------------
.../db/sync/sender/manager/TsFileSyncManager.java | 106 ---------------
.../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 12 ++
.../iotdb/db/sync/sender/pipe/TsFilePipe.java | 144 ++++++++------------
.../db/sync/transport/client/SenderManager.java | 1 -
.../apache/iotdb/db/utils/sync/SyncPipeUtil.java | 47 +++++++
.../engine/storagegroup/TsFileProcessorTest.java | 6 +-
.../db/sync/receiver/manager/SyncInfoTest.java | 8 +-
.../db/sync/receiver/recovery/SyncLogTest.java | 8 +-
.../src/main/thrift/confignode.thrift | 15 +++
43 files changed, 757 insertions(+), 571 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
similarity index 56%
copy from
integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
index 51dbcf99de..96907efc55 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -36,9 +37,11 @@ import java.sql.Statement;
import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual;
+// TODO: this test only support for new standalone now
+@Ignore
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
-public class IoTDBPipeSinkIT {
+public class IoTDBPipeIT {
@BeforeClass
public static void setUp() throws Exception {
@@ -51,44 +54,54 @@ public class IoTDBPipeSinkIT {
}
@Test
- public void testShowPipeSinkType() {
+ public void testOperatePipe() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- String expectedHeader = ColumnHeaderConstant.COLUMN_PIPESINK_TYPE + ",";
- String[] expectedRetSet = new String[] {"IoTDB,", "ExternalPipe,"};
- try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINKTYPE")) {
- assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
- }
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- @Test
- public void testOperatePipeSink() {
- try (Connection connection = EnvFactory.getEnv().getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("CREATE PIPESINK demo1 AS IoTDB
(ip='192.168.0.1',port='6677');");
- statement.execute("CREATE PIPESINK demo2 AS IoTDB
(ip='192.168.0.2',port='6678');");
- statement.execute("CREATE PIPESINK demo3 AS IoTDB;");
- statement.execute("DROP PIPESINK demo2;");
+ String ip = EnvFactory.getEnv().getDataNodeWrapperList().get(0).getIp();
+ int port = EnvFactory.getEnv().getDataNodeWrapperList().get(0).getPort();
+ statement.execute(
+ String.format("CREATE PIPESINK demo AS IoTDB (ip='%s',port='%d');",
ip, port));
+ statement.execute("CREATE PIPE p to demo;");
String expectedHeader =
- ColumnHeaderConstant.COLUMN_PIPESINK_NAME
+ ColumnHeaderConstant.COLUMN_PIPE_CREATE_TIME
+ + ","
+ + ColumnHeaderConstant.COLUMN_PIPE_NAME
+ + ","
+ + ColumnHeaderConstant.COLUMN_PIPE_ROLE
+ + ","
+ + ColumnHeaderConstant.COLUMN_PIPE_REMOTE
+ ","
- + ColumnHeaderConstant.COLUMN_PIPESINK_TYPE
+ + ColumnHeaderConstant.COLUMN_PIPE_STATUS
+ ","
- + ColumnHeaderConstant.COLUMN_PIPESINK_ATTRIBUTES
+ + ColumnHeaderConstant.COLUMN_PIPE_MESSAGE
+ ",";
- try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK")) {
+
+ String createTime = getCreateTime("p");
+ try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
+ String[] expectedRetSet =
+ new String[] {String.format("%s,p,sender,demo,STOP,,",
createTime)};
+ assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
+ }
+ statement.execute("START PIPE p;");
+ Thread.sleep(1000); // wait 1000 ms to start thread
+ try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
String[] expectedRetSet =
new String[] {
- "demo3,IoTDB,ip='127.0.0.1',port=6670,",
"demo1,IoTDB,ip='192.168.0.1',port=6677,"
+ String.format("%s,p,sender,demo,RUNNING,,", createTime),
+ String.format("%s,p,receiver,0.0.0.0,RUNNING,,", createTime),
};
assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
}
- try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK
demo3")) {
- String[] expectedRetSet = new String[]
{"demo3,IoTDB,ip='127.0.0.1',port=6670,"};
+ statement.execute("STOP PIPE p;");
+ try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
+ String[] expectedRetSet =
+ new String[] {String.format("%s,p,sender,demo,STOP,,",
createTime)};
+ assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
+ }
+ statement.execute("DROP PIPE p;");
+ try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
+ String[] expectedRetSet =
+ new String[] {String.format("%s,p,sender,demo,DROP,,",
createTime)};
assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
}
} catch (Exception e) {
@@ -96,4 +109,17 @@ public class IoTDBPipeSinkIT {
Assert.fail();
}
}
+
+ private String getCreateTime(String pipeName) throws Exception {
+ String createTime = "";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery("SHOW PIPE " +
pipeName)) {
+ Assert.assertTrue(resultSet.next());
+ createTime = resultSet.getString(1);
+ }
+ }
+ Assert.assertNotEquals("", createTime);
+ return createTime;
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
index 51dbcf99de..573c2dadf4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
@@ -83,12 +83,12 @@ public class IoTDBPipeSinkIT {
try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK")) {
String[] expectedRetSet =
new String[] {
- "demo3,IoTDB,ip='127.0.0.1',port=6670,",
"demo1,IoTDB,ip='192.168.0.1',port=6677,"
+ "demo3,IoTDB,ip='127.0.0.1',port=6667,",
"demo1,IoTDB,ip='192.168.0.1',port=6677,"
};
assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
}
try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINK
demo3")) {
- String[] expectedRetSet = new String[]
{"demo3,IoTDB,ip='127.0.0.1',port=6670,"};
+ String[] expectedRetSet = new String[]
{"demo3,IoTDB,ip='127.0.0.1',port=6667,"};
assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
}
} catch (Exception e) {
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index a5be8af83b..588db66902 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@Ignore
@Category({LocalStandaloneTest.class})
public class IoTDBSyncSenderIT {
private boolean enableSeqSpaceCompaction;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
index 81f54e7026..8e17263cfb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
@@ -26,6 +26,8 @@ public class SyncConstant {
public static final String SYNC_SYS_DIR = "sys";
public static final String FILE_DATA_DIR_NAME = "file-data";
+ public static final String ROLE_SENDER = "sender";
+ public static final String ROLE_RECEIVER = "receiver";
// pipe log: serialNumber + SEPARATOR + SUFFIX
public static final String PIPE_LOG_DIR_NAME = "pipe-log";
@@ -54,7 +56,7 @@ public class SyncConstant {
// data config
public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
- public static final int DEFAULT_PIPE_SINK_PORT = 6670;
+ public static final int DEFAULT_PIPE_SINK_PORT = 6667;
public static final Long HEARTBEAT_DELAY_SECONDS = 30L;
public static final int CONNECT_TIMEOUT_MILLISECONDS = 1_000;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index c14ebd6a93..f50abbf5a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.wal.WALManager;
@@ -665,6 +666,7 @@ public class StorageEngineV2 implements IService {
.deleteWALNode(
region.getStorageGroupName() + FILE_NAME_SEPARATOR +
region.getDataRegionId());
}
+ SyncService.getInstance().deleteSyncManager(region.getDataRegionId());
} catch (Exception e) {
logger.error(
"Error occurs when deleting data region {}-{}",
@@ -682,6 +684,10 @@ public class StorageEngineV2 implements IService {
return dataRegionMap.get(regionId);
}
+ public List<DataRegion> getAllDataRegions() {
+ return new ArrayList<>(dataRegionMap.values());
+ }
+
public List<DataRegionId> getAllDataRegionIds() {
return new ArrayList<>(dataRegionMap.keySet());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e58f55202e..7f959fb3fa 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -88,7 +88,8 @@ import org.apache.iotdb.db.service.SettleService;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.UpgradeUtils;
@@ -275,9 +276,6 @@ public class DataRegion {
private IDTable idTable;
- /** used to collect TsFiles in this virtual storage group */
- private TsFileSyncManager tsFileSyncManager =
TsFileSyncManager.getInstance();
-
/**
* constrcut a storage group processor
*
@@ -751,8 +749,9 @@ public class DataRegion {
TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
if (!recoverPerformer.canWrite()) {
// cannot write, just close it
- if (tsFileSyncManager.isEnableSync()) {
- tsFileSyncManager.collectRealTimeTsFile(tsFileResource.getTsFile());
+ for (ISyncManager syncManager :
+ SyncService.getInstance().getOrCreateSyncManager(dataRegionId)) {
+ syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
}
try {
tsFileResource.close();
@@ -2414,8 +2413,9 @@ public class DataRegion {
tsFileResource.getProcessor().deleteDataInMemory(deletion,
devicePaths);
}
- if (tsFileSyncManager.isEnableSync()) {
- tsFileSyncManager.collectRealTimeDeletion(deletion, storageGroupName);
+ for (ISyncManager syncManager :
+ SyncService.getInstance().getOrCreateSyncManager(dataRegionId)) {
+ syncManager.syncRealTimeDeletion(deletion);
}
// add a record in case of rollback
@@ -3677,14 +3677,15 @@ public class DataRegion {
/**
* Used to collect history TsFiles(i.e. the tsfile whose memtable == null).
*
+ * @param syncManager ISyncManager which invokes to collect history TsFile
* @param dataStartTime only collect history TsFiles which contains the data
after the
* dataStartTime
* @return A list, which contains TsFile path
*/
- public List<File> collectHistoryTsFileForSync(long dataStartTime) {
+ public List<File> collectHistoryTsFileForSync(ISyncManager syncManager, long
dataStartTime) {
writeLock("Collect data for sync");
try {
- return tsFileManager.collectHistoryTsFileForSync(dataStartTime);
+ return tsFileManager.collectHistoryTsFileForSync(syncManager,
dataStartTime);
} finally {
writeUnlock();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index f09347c8d1..32c70b13da 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -373,12 +373,12 @@ public class TsFileManager {
return unsequenceRecoverTsFileResources;
}
- public List<File> collectHistoryTsFileForSync(long dataStartTime) {
+ public List<File> collectHistoryTsFileForSync(ISyncManager syncManager, long
dataStartTime) {
readLock();
try {
List<File> historyTsFiles = new ArrayList<>();
- collectTsFile(historyTsFiles, getTsFileList(true), dataStartTime);
- collectTsFile(historyTsFiles, getTsFileList(false), dataStartTime);
+ collectTsFile(historyTsFiles, getTsFileList(true), syncManager,
dataStartTime);
+ collectTsFile(historyTsFiles, getTsFileList(false), syncManager,
dataStartTime);
return historyTsFiles;
} finally {
readUnlock();
@@ -386,8 +386,10 @@ public class TsFileManager {
}
private void collectTsFile(
- List<File> historyTsFiles, List<TsFileResource> tsFileResources, long
dataStartTime) {
- TsFileSyncManager syncManager = TsFileSyncManager.getInstance();
+ List<File> historyTsFiles,
+ List<TsFileResource> tsFileResources,
+ ISyncManager syncManager,
+ long dataStartTime) {
for (TsFileResource tsFileResource : tsFileResources) {
if (tsFileResource.getFileEndTime() < dataStartTime) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c1f7e98f5f..605cc979db 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -59,7 +59,8 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -166,9 +167,6 @@ public class TsFileProcessor {
/** flush file listener */
private List<FlushListener> flushListeners = new ArrayList<>();
- /** used to collct this TsFile for sync */
- private TsFileSyncManager tsFileSyncManager =
TsFileSyncManager.getInstance();
-
@SuppressWarnings("squid:S107")
TsFileProcessor(
String storageGroupName,
@@ -839,8 +837,10 @@ public class TsFileProcessor {
if (!flushingMemTables.isEmpty()) {
modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
}
- if (tsFileSyncManager.isEnableSync()) {
- tsFileSyncManager.collectRealTimeDeletion(deletion, storageGroupName);
+ for (ISyncManager syncManager :
+ SyncService.getInstance()
+
.getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+ syncManager.syncRealTimeDeletion(deletion);
}
} finally {
flushQueryLock.writeLock().unlock();
@@ -996,8 +996,10 @@ public class TsFileProcessor {
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
addAMemtableIntoFlushingList(tmpMemTable);
- if (tsFileSyncManager.isEnableSync()) {
- tsFileSyncManager.collectRealTimeTsFile(tsFileResource.getTsFile());
+ for (ISyncManager syncManager :
+ SyncService.getInstance()
+
.getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+ syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
}
logger.info("Memtable {} has been added to flushing list",
tmpMemTable);
shouldClose = true;
@@ -1426,8 +1428,10 @@ public class TsFileProcessor {
long closeStartTime = System.currentTimeMillis();
writer.endFile();
tsFileResource.serialize();
- if (tsFileSyncManager.isEnableSync()) {
- tsFileSyncManager.collectRealTimeResource(tsFileResource.getTsFile());
+ for (ISyncManager syncManager :
+ SyncService.getInstance()
+
.getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+ syncManager.syncRealTimeResource(tsFileResource.getTsFile());
}
logger.info("Ended file {}", tsFileResource);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
index e079e77289..35e55dea8a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -466,15 +465,6 @@ public class StorageGroupManager {
}
}
- /** collect all tsfiles whose memtable == null for sync */
- public List<File> collectHistoryTsFileForSync(long dataStartTime) {
- List<File> historyTsFiles = new ArrayList<>();
- for (DataRegion processor : this.dataRegion) {
-
historyTsFiles.addAll(processor.collectHistoryTsFileForSync(dataStartTime));
- }
- return historyTsFiles;
- }
-
/** only for test */
public void reset() {
Arrays.fill(dataRegion, null);
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index c6be69d323..25c1c9332d 100644
---
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -50,6 +50,9 @@ import
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -65,6 +68,7 @@ import
org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -79,19 +83,17 @@ import
org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.sync.SyncService;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -116,8 +118,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-
/**
* This class simulates the behaviour of configNode to manage the configs
locally. The schema
* configs include storage group, schema region and template. The data config
is dataRegion.
@@ -274,10 +274,6 @@ public class LocalConfigNode {
schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
}
- if (SchemaSyncManager.getInstance().isEnableSync()) {
- SchemaSyncManager.getInstance().syncMetadataPlan(new
SetStorageGroupPlan(storageGroup));
- }
-
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(1);
}
@@ -291,22 +287,12 @@ public class LocalConfigNode {
dataPartitionTable.deleteStorageGroup(storageGroup);
}
- DeleteTimeSeriesPlan deleteTimeSeriesPlan =
- SchemaSyncManager.getInstance().isEnableSync()
- ? SchemaSyncManager.getInstance()
- .splitDeleteTimeseriesPlanByDevice(
- storageGroup.concatNode(MULTI_LEVEL_PATH_WILDCARD))
- : null;
-
deleteSchemaRegionsInStorageGroup(
storageGroup,
schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
for (Template template : templateManager.getTemplateMap().values()) {
templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
}
- if (SchemaSyncManager.getInstance().isEnableSync()) {
- SchemaSyncManager.getInstance().syncMetadataPlan(deleteTimeSeriesPlan);
- }
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
@@ -1370,4 +1356,45 @@ public class LocalConfigNode {
return Collections.singletonList(syncService.getPipeSink(pipeSinkName));
}
}
+
+ public TSStatus createPipe(CreatePipeStatement createPipeStatement) {
+ try {
+ syncService.addPipe(createPipeStatement);
+ } catch (PipeException e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public TSStatus startPipe(String pipeName) {
+ try {
+ syncService.startPipe(pipeName);
+ } catch (PipeException e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public TSStatus stopPipe(String pipeName) {
+ try {
+ syncService.stopPipe(pipeName);
+ } catch (PipeException e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public TSStatus dropPipe(String pipeName) {
+ try {
+ syncService.dropPipe(pipeName);
+ } catch (PipeException e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public TShowPipeResp showPipe(String pipeName) {
+ List<TPipeInfo> pipeInfos = SyncService.getInstance().showPipe(pipeName);
+ return new
TShowPipeResp().setPipeInfoList(pipeInfos).setStatus(StatusUtils.OK);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 6c3e308799..90394b6041 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -79,7 +79,6 @@ import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.external.api.ISeriesNumerLimiter;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -178,7 +177,6 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
// device -> DeviceMNode
private LoadingCache<PartialPath, IMNode> mNodeCache;
private TagManager tagManager;
- private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
private final ISeriesNumerLimiter seriesNumerLimiter;
@@ -646,9 +644,6 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
plan.setTagOffset(offset);
writeToMLog(plan);
- if (syncManager.isEnableSync()) {
- syncManager.syncMetadataPlan(plan);
- }
}
if (offset != -1) {
leafMNode.setOffset(offset);
@@ -792,9 +787,6 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
plan.setTagOffsets(tagOffsets);
writeToMLog(plan);
- if (syncManager.isEnableSync()) {
- syncManager.syncMetadataPlan(plan);
- }
}
tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -861,9 +853,6 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
writeToMLog(deleteTimeSeriesPlan);
- if (syncManager.isEnableSync()) {
- syncManager.syncMetadataPlan(deleteTimeSeriesPlan);
- }
}
} catch (DeleteFailedException e) {
failedNames.add(e.getName());
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 2194b4126a..051f78898f 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -76,7 +76,6 @@ import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.external.api.ISeriesNumerLimiter;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -172,7 +171,6 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
// device -> DeviceMNode
private LoadingCache<PartialPath, IMNode> mNodeCache;
private TagManager tagManager;
- private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
private final ISeriesNumerLimiter seriesNumerLimiter;
@@ -538,9 +536,6 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
}
plan.setTagOffset(offset);
logWriter.createTimeseries(plan);
- if (syncManager.isEnableSync()) {
- syncManager.syncMetadataPlan(plan);
- }
}
if (offset != -1) {
leafMNode.setOffset(offset);
@@ -708,9 +703,6 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
}
plan.setTagOffsets(tagOffsets);
logWriter.createAlignedTimeseries(plan);
- if (syncManager.isEnableSync()) {
- syncManager.syncMetadataPlan(plan);
- }
}
tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -783,9 +775,6 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
}
deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
logWriter.deleteTimeseries(deleteTimeSeriesPlan);
- if (syncManager.isEnableSync()) {
- syncManager.syncMetadataPlan(deleteTimeSeriesPlan);
- }
}
} catch (DeleteFailedException e) {
failedNames.add(e.getName());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 55faa87dab..e642a4504f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -96,6 +96,14 @@ public class ColumnHeaderConstant {
public static final String COLUMN_PIPESINK_NAME = "name";
public static final String COLUMN_PIPESINK_ATTRIBUTES = "attributes";
+ // column names for show pipe
+ public static final String COLUMN_PIPE_CREATE_TIME = "create time";
+ public static final String COLUMN_PIPE_NAME = "name";
+ public static final String COLUMN_PIPE_ROLE = "role";
+ public static final String COLUMN_PIPE_REMOTE = "remote";
+ public static final String COLUMN_PIPE_STATUS = "status";
+ public static final String COLUMN_PIPE_MESSAGE = "message";
+
public static final List<ColumnHeader> lastQueryColumnHeaders =
ImmutableList.of(
new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
@@ -235,4 +243,13 @@ public class ColumnHeaderConstant {
new ColumnHeader(COLUMN_PIPESINK_NAME, TSDataType.TEXT),
new ColumnHeader(COLUMN_PIPESINK_TYPE, TSDataType.TEXT),
new ColumnHeader(COLUMN_PIPESINK_ATTRIBUTES, TSDataType.TEXT));
+
+ public static final List<ColumnHeader> showPipeColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(COLUMN_PIPE_CREATE_TIME, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_PIPE_NAME, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_PIPE_ROLE, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_PIPE_REMOTE, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_PIPE_STATUS, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_PIPE_MESSAGE, TSDataType.TEXT));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index b079f3ad55..b6f9f5a6c3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -120,4 +120,8 @@ public class DatasetHeaderFactory {
public static DatasetHeader getShowPipeSinkHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPipeSinkColumnHeaders,
true);
}
+
+ public static DatasetHeader getShowPipeHeader() {
+ return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f8853dd43e..0f770e6509 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -77,8 +77,13 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -659,7 +664,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> createPipe() {
+ public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement
createPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
future.setException(
new IoTDBException(
@@ -680,7 +685,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> dropPipe() {
+ public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement
dropPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
future.setException(
new IoTDBException(
@@ -701,7 +706,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> showPipe() {
+ public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement
showPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
future.setException(
new IoTDBException(
@@ -722,7 +727,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> startPipe() {
+ public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement
startPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
future.setException(
new IoTDBException(
@@ -732,7 +737,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> stopPipe() {
+ public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement
stopPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
future.setException(
new IoTDBException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index e4f03578b2..861f42277c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -39,8 +39,13 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
import com.google.common.util.concurrent.SettableFuture;
@@ -119,13 +124,13 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> showPipeSink(ShowPipeSinkStatement
showPipeSinkStatement);
- SettableFuture<ConfigTaskResult> dropPipe();
+ SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement
dropPipeStatement);
- SettableFuture<ConfigTaskResult> createPipe();
+ SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement
createPipeStatement);
- SettableFuture<ConfigTaskResult> showPipe();
+ SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement
startPipeStatement);
- SettableFuture<ConfigTaskResult> startPipe();
+ SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement
stopPipeStatement);
- SettableFuture<ConfigTaskResult> stopPipe();
+ SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement
showPipeStatement);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 3d48e6774b..27aa838107 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.trigger.enums.TriggerEvent;
import org.apache.iotdb.commons.trigger.enums.TriggerType;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupInfo;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
@@ -38,6 +39,7 @@ import
org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupT
import
org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
@@ -52,8 +54,13 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -466,16 +473,6 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
- @Override
- public SettableFuture<ConfigTaskResult> createPipe() {
- SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing create pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
- return future;
- }
-
@Override
public SettableFuture<ConfigTaskResult> createPipeSink(
CreatePipeSinkStatement createPipeSinkStatement) {
@@ -514,42 +511,59 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> dropPipe() {
+ public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement
createPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing drop pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ TSStatus tsStatus =
LocalConfigNode.getInstance().createPipe(createPipeStatement);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> showPipe() {
+ public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement
startPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing show pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ TSStatus tsStatus =
LocalConfigNode.getInstance().startPipe(startPipeStatement.getPipeName());
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> startPipe() {
+ public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement
stopPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing Start pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ TSStatus tsStatus =
LocalConfigNode.getInstance().stopPipe(stopPipeStatement.getPipeName());
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> stopPipe() {
+ public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement
dropPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing stop pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ TSStatus tsStatus =
LocalConfigNode.getInstance().dropPipe(dropPipeStatement.getPipeName());
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement
showPipeStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TShowPipeResp showPipeResp =
+
LocalConfigNode.getInstance().showPipe(showPipeStatement.getPipeName());
+ ShowPipeTask.buildTSBlock(showPipeResp.getPipeInfoList(), future);
return future;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
index e01b159bf2..9b22eac678 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/CreatePipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public class CreatePipeTask implements IConfigTask {
- private CreatePipeStatement createPipeStatement;
+ private final CreatePipeStatement createPipeStatement;
public CreatePipeTask(CreatePipeStatement createPipeStatement) {
this.createPipeStatement = createPipeStatement;
@@ -37,6 +37,6 @@ public class CreatePipeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.createPipe();
+ return configTaskExecutor.createPipe(createPipeStatement);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
index b1236924e3..8a878de2dd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/DropPipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public class DropPipeTask implements IConfigTask {
- private DropPipeStatement dropPipeStatement;
+ private final DropPipeStatement dropPipeStatement;
public DropPipeTask(DropPipeStatement dropPipeStatement) {
this.dropPipeStatement = dropPipeStatement;
@@ -37,6 +37,6 @@ public class DropPipeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.dropPipe();
+ return configTaskExecutor.dropPipe(dropPipeStatement);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
index 6ea56964b1..180cdd2d1d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
@@ -19,16 +19,30 @@
package org.apache.iotdb.db.mpp.plan.execution.config.sys.sync;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.stream.Collectors;
public class ShowPipeTask implements IConfigTask {
- private ShowPipeStatement showPipeStatement;
+ private final ShowPipeStatement showPipeStatement;
public ShowPipeTask(ShowPipeStatement showPipeStatement) {
this.showPipeStatement = showPipeStatement;
@@ -37,6 +51,29 @@ public class ShowPipeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.showPipe();
+ return configTaskExecutor.showPipe(showPipeStatement);
+ }
+
+ public static void buildTSBlock(
+ List<TPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> future) {
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showPipeColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ for (TPipeInfo tPipeInfo : pipeInfoList) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder
+ .getColumnBuilder(0)
+ .writeBinary(new
Binary(DatetimeUtils.convertLongToDate(tPipeInfo.getCreateTime())));
+ builder.getColumnBuilder(1).writeBinary(new
Binary(tPipeInfo.getPipeName()));
+ builder.getColumnBuilder(2).writeBinary(new Binary(tPipeInfo.getRole()));
+ builder.getColumnBuilder(3).writeBinary(new
Binary(tPipeInfo.getRemote()));
+ builder.getColumnBuilder(4).writeBinary(new
Binary(tPipeInfo.getStatus()));
+ builder.getColumnBuilder(5).writeBinary(new
Binary(tPipeInfo.getMessage()));
+ builder.declarePosition();
+ }
+ DatasetHeader datasetHeader = DatasetHeaderFactory.getShowPipeHeader();
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS,
builder.build(), datasetHeader));
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
index aa10217815..03dc2143f4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StartPipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public class StartPipeTask implements IConfigTask {
- private StartPipeStatement startPipeStatement;
+ private final StartPipeStatement startPipeStatement;
public StartPipeTask(StartPipeStatement startPipeStatement) {
this.startPipeStatement = startPipeStatement;
@@ -37,6 +37,6 @@ public class StartPipeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.startPipe();
+ return configTaskExecutor.startPipe(startPipeStatement);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
index 14ae9a1a41..e53b4f93c5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public class StopPipeTask implements IConfigTask {
- private StopPipeStatement stopPipeStatement;
+ private final StopPipeStatement stopPipeStatement;
public StopPipeTask(StopPipeStatement stopPipeStatement) {
this.stopPipeStatement = stopPipeStatement;
@@ -37,6 +37,6 @@ public class StopPipeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.stopPipe();
+ return configTaskExecutor.stopPipe(stopPipeStatement);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 1d29cb3c6e..143c6d315d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -2823,6 +2823,8 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
}
if (ctx.syncAttributeClauses() != null) {
createPipeStatement.setPipeAttributes(parseSyncAttributeClauses(ctx.syncAttributeClauses()));
+ } else {
+ createPipeStatement.setPipeAttributes(new HashMap<>());
}
return createPipeStatement;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
index e2763b9c6b..a3d59b52b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeStatement.java
@@ -20,13 +20,16 @@
package org.apache.iotdb.db.mpp.plan.statement.sys.sync;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -87,4 +90,39 @@ public class CreatePipeStatement extends Statement
implements IConfigStatement {
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitCreatePipe(this, context);
}
+
+ public static CreatePipeStatement parseString(String parsedString) throws
IOException {
+ String[] split =
parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ if (split.length < 4) {
+ throw new IOException("Parsing CreatePipePlan error. Attributes is less
than expected.");
+ }
+ CreatePipeStatement statement = new
CreatePipeStatement(StatementType.CREATE_PIPE);
+ statement.setPipeName(split[0]);
+ statement.setPipeSinkName(split[1]);
+ statement.setStartTime(Long.parseLong(split[2]));
+ int size = (Integer.parseInt(split[3]) << 1);
+ if (split.length != (size + 4)) {
+ throw new IOException("Parsing CreatePipePlan error. Attributes number
is wrong.");
+ }
+ Map<String, String> attributes = new HashMap<>();
+ for (int i = 0; i < size; i += 2) {
+ attributes.put(split[i + 4], split[i + 5]);
+ }
+ statement.setPipeAttributes(attributes);
+ return statement;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+
builder.append(pipeName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+
builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+
builder.append(startTime).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+
builder.append(pipeAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ for (Map.Entry<String, String> entry : pipeAttributes.entrySet()) {
+
builder.append(entry.getKey()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+
builder.append(entry.getValue()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ }
+ return builder.toString();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 62a8699a2b..285c584228 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -28,9 +28,11 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
@@ -41,6 +43,7 @@ import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -59,6 +62,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +70,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -158,6 +163,7 @@ public class SyncService implements IService {
// region Interfaces and Implementation of Pipe
+ // TODO(sync): delete this in new-standalone version
public synchronized void addPipe(CreatePipePlan plan) throws PipeException {
// check plan
long currentTime = DatetimeUtils.currentTime();
@@ -197,6 +203,45 @@ public class SyncService implements IService {
}
}
+ public synchronized void addPipe(CreatePipeStatement statement) throws
PipeException {
+ // check statement
+ long currentTime = DatetimeUtils.currentTime();
+ if (statement.getStartTime() > currentTime) {
+ throw new PipeException(
+ String.format(
+ "Start time %s is later than current time %s, this is not
supported yet.",
+ DatetimeUtils.convertLongToDate(statement.getStartTime()),
+ DatetimeUtils.convertLongToDate(currentTime)));
+ }
+ // add pipe
+ TSStatus status = syncInfoFetcher.addPipe(statement, currentTime);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.message);
+ }
+
+ PipeSink runningPipeSink = getPipeSink(statement.getPipeSinkName());
+ runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(statement,
runningPipeSink, currentTime);
+ if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
+ try {
+ senderManager = new SenderManager(runningPipe, (IoTDBPipeSink)
runningPipeSink);
+ } catch (ClassCastException e) {
+ logger.error(
+ String.format(
+ "Cast Class to %s error when create pipe %s.",
+ IoTDBPipeSink.class.getName(), statement.getPipeName()),
+ e);
+ runningPipe = null;
+ throw new PipeException(
+ String.format(
+ "Wrong pipeSink type %s for create pipe %s",
+ runningPipeSink.getType(), runningPipeSink.getPipeSinkName()));
+ }
+ } else { // for external pipe
+ // == start ExternalPipeProcessor for send data to external pipe plugin
+ startExternalPipeManager(false);
+ }
+ }
+
public synchronized void stopPipe(String pipeName) throws PipeException {
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
@@ -312,6 +357,40 @@ public class SyncService implements IService {
}
}
+ public List<TPipeInfo> showPipe(String pipeName) {
+ boolean showAll = StringUtils.isEmpty(pipeName);
+ List<TPipeInfo> list = new ArrayList<>();
+ // show pipe in sender
+ for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
+ if (showAll || pipeName.equals(pipe.getPipeName())) {
+ TPipeInfo tPipeInfo =
+ new TPipeInfo(
+ pipe.getCreateTime(),
+ pipe.getPipeName(),
+ SyncConstant.ROLE_SENDER,
+ pipe.getPipeSinkName(),
+ pipe.getStatus().name(),
+ "");
+ list.add(tPipeInfo);
+ }
+ }
+ // show pipe in receiver
+ for (TSyncIdentityInfo identityInfo :
receiverManager.getAllTSyncIdentityInfos()) {
+ if (showAll || pipeName.equals(identityInfo.getPipeName())) {
+ TPipeInfo tPipeInfo =
+ new TPipeInfo(
+ identityInfo.getCreateTime(),
+ identityInfo.getPipeName(),
+ SyncConstant.ROLE_RECEIVER,
+ identityInfo.getAddress(),
+ Pipe.PipeStatus.RUNNING.name(),
+ "");
+ list.add(tPipeInfo);
+ }
+ }
+ return list;
+ }
+
public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
boolean showAll = "".equals(plan.getPipeName());
// show pipe in sender
@@ -523,6 +602,22 @@ public class SyncService implements IService {
}
}
+ public List<ISyncManager> getOrCreateSyncManager(String dataRegionId) {
+ // TODO(sync): maybe add cache to accelerate
+ List<ISyncManager> syncManagerList = new ArrayList<>();
+ if (runningPipe != null) {
+ syncManagerList.add(runningPipe.getOrCreateSyncManager(dataRegionId));
+ }
+ return syncManagerList;
+ }
+
+ /** This method will be called before deleting dataRegion */
+ public void deleteSyncManager(String dataRegionId) {
+ if (runningPipe != null) {
+ runningPipe.deleteSyncManager(dataRegionId);
+ }
+ }
+
@TestOnly
public SenderManager getSenderManager() {
return senderManager;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index a419d5b5c3..9e92d8360a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.common;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
@@ -45,9 +46,11 @@ public interface ISyncInfoFetcher {
// region Interfaces of Pipe
- // TODO: use CreatePipeNode as parameter
+ // TODO(sync): delete this in new-standalone version
TSStatus addPipe(CreatePipePlan plan, long createTime);
+ TSStatus addPipe(CreatePipeStatement createPipeStatement, long createTime);
+
TSStatus stopPipe(String pipeName);
TSStatus startPipe(String pipeName);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 7f10723153..9ba37501e0 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -22,8 +22,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
@@ -102,10 +103,20 @@ public class LocalSyncInfoFetcher implements
ISyncInfoFetcher {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ @Override
+ public TSStatus addPipe(CreatePipeStatement createPipeStatement, long
createTime) {
+ try {
+ syncInfo.addPipe(createPipeStatement, createTime);
+ } catch (PipeException | IOException e) {
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
@Override
public TSStatus stopPipe(String pipeName) {
try {
- syncInfo.operatePipe(pipeName, Operator.OperatorType.STOP_PIPE);
+ syncInfo.operatePipe(pipeName, StatementType.STOP_PIPE);
} catch (PipeException | IOException e) {
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
@@ -115,7 +126,7 @@ public class LocalSyncInfoFetcher implements
ISyncInfoFetcher {
@Override
public TSStatus startPipe(String pipeName) {
try {
- syncInfo.operatePipe(pipeName, Operator.OperatorType.START_PIPE);
+ syncInfo.operatePipe(pipeName, StatementType.START_PIPE);
} catch (PipeException | IOException e) {
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
@@ -125,7 +136,7 @@ public class LocalSyncInfoFetcher implements
ISyncInfoFetcher {
@Override
public TSStatus dropPipe(String pipeName) {
try {
- syncInfo.operatePipe(pipeName, Operator.OperatorType.DROP_PIPE);
+ syncInfo.operatePipe(pipeName, StatementType.DROP_PIPE);
} catch (PipeException | IOException e) {
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 218e0994e4..560605b5e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -22,8 +22,9 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
@@ -144,7 +145,7 @@ public class SyncInfo {
// endregion
// region Implement of Pipe
-
+ // TODO: delete this in new-standalone version
public void addPipe(CreatePipePlan plan, long createTime) throws
PipeException, IOException {
// common check
if (runningPipe != null && runningPipe.getStatus() !=
Pipe.PipeStatus.DROP) {
@@ -163,10 +164,32 @@ public class SyncInfo {
syncLogWriter.addPipe(plan, createTime);
}
- public void operatePipe(String pipeName, Operator.OperatorType operatorType)
+ public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
+ throws PipeException, IOException {
+ // common check
+ if (runningPipe != null && runningPipe.getStatus() !=
Pipe.PipeStatus.DROP) {
+ throw new PipeException(
+ String.format(
+ "Pipe %s is %s, please retry after drop it.",
+ runningPipe.getPipeName(), runningPipe.getStatus().name()));
+ }
+ if (!isPipeSinkExist(createPipeStatement.getPipeSinkName())) {
+ throw new PipeException(
+ String.format("Can not find pipeSink %s.",
createPipeStatement.getPipeSinkName()));
+ }
+
+ PipeSink runningPipeSink =
getPipeSink(createPipeStatement.getPipeSinkName());
+ runningPipe =
+ SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
+ createPipeStatement, runningPipeSink, createTime);
+ pipes.add(runningPipe);
+ syncLogWriter.addPipe(createPipeStatement, createTime);
+ }
+
+ public void operatePipe(String pipeName, StatementType statementType)
throws PipeException, IOException {
checkIfPipeExistAndRunning(pipeName);
- switch (operatorType) {
+ switch (statementType) {
case START_PIPE:
runningPipe.start();
break;
@@ -177,9 +200,9 @@ public class SyncInfo {
runningPipe.drop();
break;
default:
- throw new PipeException("Unknown operatorType " + operatorType);
+ throw new PipeException("Unknown operatorType " + statementType);
}
- syncLogWriter.operatePipe(pipeName, operatorType);
+ syncLogWriter.operatePipe(pipeName, statementType);
}
public List<PipeInfo> getAllPipeInfos() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index a9e91c09c7..344b4b8f08 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
@@ -128,11 +128,11 @@ public class SyncLogReader {
case CREATE_PIPE:
readLine = br.readLine();
lineNumber += 1;
- CreatePipePlan pipePlan = CreatePipePlan.parseString(readLine);
+ CreatePipeStatement createPipeStatement =
CreatePipeStatement.parseString(readLine);
runningPipe =
SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
- pipePlan,
- pipeSinks.get(pipePlan.getPipeSinkName()),
+ createPipeStatement,
+ pipeSinks.get(createPipeStatement.getPipeSinkName()),
Long.parseLong(parseStrings[1]));
pipes.add(runningPipe);
break;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
index 01e3068f15..556e85f337 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.sync.common.persistence;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
@@ -55,6 +57,7 @@ public class SyncLogWriter {
}
}
+ // TODO(sync): delete this in new-standalone version
public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws
IOException {
getBufferedWriter();
pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
@@ -83,6 +86,7 @@ public class SyncLogWriter {
pipeInfoWriter.flush();
}
+ // TODO(sync): delete this in new-standalone version
public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime)
throws IOException {
getBufferedWriter();
pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPE.name());
@@ -94,9 +98,20 @@ public class SyncLogWriter {
pipeInfoWriter.flush();
}
- public synchronized void operatePipe(String pipeName, Operator.OperatorType
type)
+ public synchronized void addPipe(CreatePipeStatement createPipeStatement,
long pipeCreateTime)
throws IOException {
getBufferedWriter();
+ pipeInfoWriter.write(createPipeStatement.getType().name());
+ pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
+ pipeInfoWriter.write(String.valueOf(pipeCreateTime));
+ pipeInfoWriter.newLine();
+ pipeInfoWriter.write(createPipeStatement.toString());
+ pipeInfoWriter.newLine();
+ pipeInfoWriter.flush();
+ }
+
+ public synchronized void operatePipe(String pipeName, StatementType type)
throws IOException {
+ getBufferedWriter();
pipeInfoWriter.write(type.name());
pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
pipeInfoWriter.write(pipeName);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index c46e66a095..70746879f2 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -18,21 +18,33 @@
*/
package org.apache.iotdb.db.sync.receiver.load;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
-import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
/** This loader is used to load tsFiles. If .mods file exists, it will be
loaded as well. */
public class TsFileLoader implements ILoader {
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileLoader.class);
+ private static PlanExecutor planExecutor;
- private File tsFile;
+ static {
+ try {
+ planExecutor = new PlanExecutor();
+ } catch (QueryProcessException e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ private final File tsFile;
public TsFileLoader(File tsFile) {
this.tsFile = tsFile;
@@ -41,24 +53,14 @@ public class TsFileLoader implements ILoader {
@Override
public void load() throws PipeDataLoadException {
try {
- TsFileResource tsFileResource = new TsFileResource(tsFile);
- tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
- FileLoaderUtils.loadOrGenerateResource(tsFileResource);
- List<TsFileResource> splitResources = new ArrayList();
- if (tsFileResource.isSpanMultiTimePartitions()) {
- TsFileSplitByPartitionTool.rewriteTsFile(tsFileResource,
splitResources);
- tsFileResource.writeLock();
- tsFileResource.removeModFile();
- tsFileResource.writeUnlock();
- }
-
- if (splitResources.isEmpty()) {
- splitResources.add(tsFileResource);
- }
-
- for (TsFileResource resource : splitResources) {
- StorageEngine.getInstance().loadNewTsFile(resource, false);
- }
+ PhysicalPlan plan =
+ new OperateFilePlan(
+ tsFile,
+ Operator.OperatorType.LOAD_FILES,
+ true,
+
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(),
+ true);
+ planExecutor.processNonQuery(plan);
} catch (Exception e) {
throw new PipeDataLoadException(e.getMessage());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/ISyncManager.java
similarity index 50%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
copy to
server/src/main/java/org/apache/iotdb/db/sync/sender/manager/ISyncManager.java
index 14ae9a1a41..33bdf79f33 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/StopPipeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/ISyncManager.java
@@ -16,27 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.sync.sender.manager;
-package org.apache.iotdb.db.mpp.plan.execution.config.sys.sync;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
-import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
-import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
+import java.io.File;
+import java.util.List;
-import com.google.common.util.concurrent.ListenableFuture;
+/**
+ * ISyncManager is designed for collect all history TsFiles(i.e. before the
pipe start time, all
+ * tsfiles whose memtable is set to null.) and realtime TsFiles for registered
{@link TsFilePipe}.
+ */
+public interface ISyncManager {
+ /** tsfile */
+ void syncRealTimeDeletion(Deletion deletion);
+
+ void syncRealTimeTsFile(File tsFile);
-public class StopPipeTask implements IConfigTask {
+ void syncRealTimeResource(File tsFile);
- private StopPipeStatement stopPipeStatement;
+ List<File> syncHistoryTsFile(long dataStartTime);
- public StopPipeTask(StopPipeStatement stopPipeStatement) {
- this.stopPipeStatement = stopPipeStatement;
- }
+ File createHardlink(File tsFile, long modsOffset);
- @Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
- throws InterruptedException {
- return configTaskExecutor.stopPipe();
- }
+ void delete();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/LocalSyncManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/LocalSyncManager.java
new file mode 100644
index 0000000000..b366ab3e2a
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/LocalSyncManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.manager;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class LocalSyncManager implements ISyncManager {
+
+ private TsFilePipe syncPipe;
+ private final DataRegion dataRegion;
+
+ public LocalSyncManager(DataRegion dataRegion, Pipe pipe) {
+ this.dataRegion = dataRegion;
+ this.syncPipe = (TsFilePipe) pipe;
+ }
+
+ /** tsfile */
+ @Override
+ public void syncRealTimeDeletion(Deletion deletion) {
+ syncPipe.collectRealTimeDeletion(deletion,
dataRegion.getStorageGroupName());
+ }
+
+ @Override
+ public void syncRealTimeTsFile(File tsFile) {
+ syncPipe.collectRealTimeTsFile(tsFile);
+ }
+
+ @Override
+ public void syncRealTimeResource(File tsFile) {
+ syncPipe.collectRealTimeResource(tsFile);
+ }
+
+ @Override
+ public List<File> syncHistoryTsFile(long dataStartTime) {
+ return new ArrayList<>(this.dataRegion.collectHistoryTsFileForSync(this,
dataStartTime));
+ }
+
+ @Override
+ public File createHardlink(File tsFile, long modsOffset) {
+ return syncPipe.createHistoryTsFileHardlink(tsFile, modsOffset);
+ }
+
+ @Override
+ public void delete() {
+ // TODO(sync): parse to delete operation and sync
+ // 1、get timeseries
+ // 2、get time partition
+ // syncPipe.collectRealTimeDeletion();
+ }
+
+ public static List<PartialPath> splitPathPatternByDevice(PartialPath
pathPattern)
+ throws MetadataException {
+ Set<PartialPath> devices =
LocalSchemaProcessor.getInstance().getBelongedDevices(pathPattern);
+ List<PartialPath> resultPathPattern = new LinkedList<>();
+ for (PartialPath device : devices) {
+ pathPattern.alterPrefixPath(device).stream()
+ .filter(i -> !i.equals(device))
+ .forEach(resultPathPattern::add);
+ }
+ return resultPathPattern;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
deleted file mode 100644
index f04ea874a1..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender.manager;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
-
-/**
- * SchemaSyncManager is designed to collect history metadata(i.e. all storage
group and all
- * timeseries in IoTDB), and realtime metadata(i.e. create storage group,
timeseries and delete
- * timeseries operations) for all registered {@linkplain TsFilePipe}.
- */
-public class SchemaSyncManager {
-
- private static final Logger logger =
LoggerFactory.getLogger(SchemaSyncManager.class);
-
- private TsFilePipe syncPipe = null;
-
- private static class SchemaSyncManagerHolder {
-
- private SchemaSyncManagerHolder() {
- // allowed to do nothing
- }
-
- private static final SchemaSyncManager INSTANCE = new SchemaSyncManager();
- }
-
- public static SchemaSyncManager getInstance() {
- return SchemaSyncManagerHolder.INSTANCE;
- }
-
- public void registerSyncTask(TsFilePipe syncPipe) {
- this.syncPipe = syncPipe;
- }
-
- public void deregisterSyncTask() {
- this.syncPipe = null;
- }
-
- public boolean isEnableSync() {
- return syncPipe != null;
- }
-
- public void syncMetadataPlan(PhysicalPlan plan) {
- syncPipe.collectRealTimeMetaData(plan);
- }
-
- public void clear() {
- this.syncPipe = null;
- }
-
- /** only support for SchemaRegion */
- public List<PhysicalPlan> collectHistoryMetadata() {
- List<PhysicalPlan> historyMetadata = new ArrayList<>();
- List<SetStorageGroupPlan> storageGroupPlanList = getStorageGroupAsPlan();
- for (SetStorageGroupPlan storageGroupPlan : storageGroupPlanList) {
- historyMetadata.add(storageGroupPlan);
- }
-
- for (ISchemaRegion schemaRegion :
SchemaEngine.getInstance().getAllSchemaRegions()) {
- try {
- for (MeasurementPath measurementPath :
- schemaRegion.getMeasurementPaths(new
PartialPath(ALL_RESULT_NODES), false)) {
- if (measurementPath.isUnderAlignedEntity()) {
- historyMetadata.add(
- new CreateAlignedTimeSeriesPlan(
- measurementPath.getDevicePath(),
- measurementPath.getMeasurement(),
- (MeasurementSchema)
measurementPath.getMeasurementSchema()));
- } else {
- historyMetadata.add(
- new CreateTimeSeriesPlan(
- measurementPath, (MeasurementSchema)
measurementPath.getMeasurementSchema()));
- }
- }
- } catch (MetadataException e) {
- logger.warn(
- String.format(
- "Collect history schema from schemaRegion: %s of sg %s error.
Skip this schemaRegion.",
- schemaRegion.getSchemaRegionId(),
schemaRegion.getStorageGroupFullPath()));
- }
- }
-
- return historyMetadata;
- }
-
- private List<SetStorageGroupPlan> getStorageGroupAsPlan() {
- List<PartialPath> allStorageGroups =
IoTDB.configManager.getAllStorageGroupPaths();
- List<SetStorageGroupPlan> result = new LinkedList<>();
- for (PartialPath sgPath : allStorageGroups) {
- result.add(new SetStorageGroupPlan(sgPath));
- }
- return result;
- }
-
- public DeleteTimeSeriesPlan splitDeleteTimeseriesPlanByDevice(PartialPath
pathPattern)
- throws MetadataException {
- return new DeleteTimeSeriesPlan(splitPathPatternByDevice(pathPattern));
- }
-
- public List<PartialPath> splitPathPatternByDevice(PartialPath pathPattern)
- throws MetadataException {
- Set<PartialPath> devices =
IoTDB.schemaProcessor.getBelongedDevices(pathPattern);
- List<PartialPath> resultPathPattern = new LinkedList<>();
- for (PartialPath device : devices) {
- pathPattern.alterPrefixPath(device).stream()
- .filter(i -> !i.equals(device))
- .forEach(resultPathPattern::add);
- }
- return resultPathPattern;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
deleted file mode 100644
index b68868167d..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender.manager;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.storagegroup.dataregion.StorageGroupManager;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * TsFileSyncManager is designed for collect all history TsFiles(i.e. before
the pipe start time,
- * all tsfiles whose memtable is set to null.), and realtime tsfiles for
registered {@linkplain
- * TsFilePipe}.
- */
-public class TsFileSyncManager {
- private static final Logger logger =
LoggerFactory.getLogger(TsFileSyncManager.class);
-
- private TsFilePipe syncPipe;
-
- /** singleton */
- private TsFileSyncManager() {}
-
- private static class TsFileSyncManagerHolder {
- private static final TsFileSyncManager INSTANCE = new TsFileSyncManager();
-
- private TsFileSyncManagerHolder() {}
- }
-
- public static TsFileSyncManager getInstance() {
- return TsFileSyncManager.TsFileSyncManagerHolder.INSTANCE;
- }
-
- /** register */
- public void registerSyncTask(TsFilePipe syncPipe) {
- this.syncPipe = syncPipe;
- }
-
- public void deregisterSyncTask() {
- this.syncPipe = null;
- }
-
- public boolean isEnableSync() {
- return syncPipe != null;
- }
-
- public void clear() {
- syncPipe = null;
- }
-
- /** tsfile */
- public void collectRealTimeDeletion(Deletion deletion, String sgName) {
- syncPipe.collectRealTimeDeletion(deletion, sgName);
- }
-
- public void collectRealTimeTsFile(File tsFile) {
- syncPipe.collectRealTimeTsFile(tsFile);
- }
-
- public void collectRealTimeResource(File tsFile) {
- syncPipe.collectRealTimeResource(tsFile);
- }
-
- public List<File> registerAndCollectHistoryTsFile(TsFilePipe syncPipe, long
dataStartTime) {
- registerSyncTask(syncPipe);
-
- List<File> historyTsFiles = new ArrayList<>();
- Iterator<Map.Entry<PartialPath, StorageGroupManager>> sgIterator =
- StorageEngine.getInstance().getProcessorMap().entrySet().iterator();
- while (sgIterator.hasNext()) {
- historyTsFiles.addAll(
-
sgIterator.next().getValue().collectHistoryTsFileForSync(dataStartTime));
- }
-
- return historyTsFiles;
- }
-
- public File createHardlink(File tsFile, long modsOffset) {
- return syncPipe.createHistoryTsFileHardlink(tsFile, modsOffset);
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index f1afd9345a..773e591110 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.sync.transport.client.ISyncClient;
/**
@@ -107,6 +108,17 @@ public interface Pipe {
*/
void commit();
+ /**
+ * Get {@linkplain ISyncManager} by dataRegionId. If ISyncManager does not
exist, it will be
+ * created automatically.
+ *
+ * @param dataRegionId string of {@linkplain
org.apache.iotdb.commons.consensus.DataRegionId}
+ * @return ISyncManager
+ */
+ ISyncManager getOrCreateSyncManager(String dataRegionId);
+
+ void deleteSyncManager(String dataRegionId);
+
// a new pipe should be stop status
enum PipeStatus {
RUNNING,
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 55f0551000..03d1bb89d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -19,19 +19,20 @@
*/
package org.apache.iotdb.db.sync.sender.pipe;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
-import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
-import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
+import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
+import org.apache.iotdb.db.sync.sender.manager.LocalSyncManager;
import org.apache.iotdb.db.sync.sender.recovery.TsFilePipeLogger;
import org.slf4j.Logger;
@@ -41,13 +42,15 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class TsFilePipe implements Pipe {
private static final Logger logger =
LoggerFactory.getLogger(TsFilePipe.class);
- private final SchemaSyncManager schemaSyncManager =
SchemaSyncManager.getInstance();
- private final TsFileSyncManager tsFileSyncManager =
TsFileSyncManager.getInstance();
+ // <dataNodeId, ISyncManager>
+ private final Map<String, ISyncManager> syncManagerMap = new
ConcurrentHashMap<>();
private final long createTime;
private final String name;
@@ -60,7 +63,6 @@ public class TsFilePipe implements Pipe {
private final TsFilePipeLogger pipeLog;
private final ReentrantLock collectRealTimeDataLock;
- private boolean isCollectingRealTimeData;
private long maxSerialNumber;
private PipeStatus status;
@@ -80,7 +82,6 @@ public class TsFilePipe implements Pipe {
this.pipeLog = new TsFilePipeLogger(this);
this.collectRealTimeDataLock = new ReentrantLock();
- this.isCollectingRealTimeData = false;
this.maxSerialNumber = Math.max(0L,
realTimeQueue.getLastMaxSerialNumber());
this.status = PipeStatus.STOP;
@@ -95,42 +96,40 @@ public class TsFilePipe implements Pipe {
return;
}
+ // init sync manager
+ List<DataRegion> dataRegions =
StorageEngineV2.getInstance().getAllDataRegions();
+ for (DataRegion dataRegion : dataRegions) {
+ logger.info(
+ logFormat(
+ "init syncManager for %s-%s",
+ dataRegion.getStorageGroupName(), dataRegion.getDataRegionId()));
+ syncManagerMap.put(dataRegion.getDataRegionId(), new
LocalSyncManager(dataRegion, this));
+ }
try {
if (!pipeLog.isCollectFinished()) {
pipeLog.clear();
- collectData();
+ collectHistoryData();
pipeLog.finishCollect();
}
- if (!isCollectingRealTimeData) {
- registerMetadata();
- registerTsFile();
- isCollectingRealTimeData = true;
- }
status = PipeStatus.RUNNING;
} catch (IOException e) {
logger.error(
- String.format(
- "Clear pipe dir %s error.", SyncPathUtil.getSenderPipeDir(name,
createTime)),
+ logFormat("Clear pipe dir %s error.",
SyncPathUtil.getSenderPipeDir(name, createTime)),
e);
throw new PipeException("Start error, can not clear pipe log.");
}
}
/** collect data * */
- private void collectData() {
- registerMetadata();
- List<PhysicalPlan> historyMetadata = collectHistoryMetadata();
- List<File> historyTsFiles = registerAndCollectHistoryTsFile();
- isCollectingRealTimeData = true;
-
- // get all history data
- int historyMetadataSize = historyMetadata.size();
- int historyTsFilesSize = historyTsFiles.size();
- for (int i = 0; i < historyMetadataSize; i++) {
- long serialNumber = 1 - historyTsFilesSize - historyMetadataSize + i;
- historyQueue.offer(new SchemaPipeData(historyMetadata.get(i),
serialNumber));
+ private void collectHistoryData() {
+ // collect history TsFile
+ List<File> historyTsFiles = new ArrayList<>();
+ for (ISyncManager syncManager : syncManagerMap.values()) {
+ historyTsFiles.addAll(syncManager.syncHistoryTsFile(dataStartTime));
}
+ // put history data into PipeDataQueue
+ int historyTsFilesSize = historyTsFiles.size();
for (int i = 0; i < historyTsFilesSize; i++) {
long serialNumber = 1 - historyTsFilesSize + i;
File tsFile = historyTsFiles.get(i);
@@ -138,41 +137,6 @@ public class TsFilePipe implements Pipe {
}
}
- private void registerMetadata() {
- schemaSyncManager.registerSyncTask(this);
- }
-
- private void deregisterMetadata() {
- schemaSyncManager.deregisterSyncTask();
- }
-
- private List<PhysicalPlan> collectHistoryMetadata() {
- return schemaSyncManager.collectHistoryMetadata();
- }
-
- public void collectRealTimeMetaData(PhysicalPlan plan) {
- collectRealTimeDataLock.lock();
- try {
- maxSerialNumber += 1L;
- PipeData metaData = new SchemaPipeData(plan, maxSerialNumber);
- realTimeQueue.offer(metaData);
- } finally {
- collectRealTimeDataLock.unlock();
- }
- }
-
- private void registerTsFile() {
- tsFileSyncManager.registerSyncTask(this);
- }
-
- private void deregisterTsFile() {
- tsFileSyncManager.deregisterSyncTask();
- }
-
- private List<File> registerAndCollectHistoryTsFile() {
- return tsFileSyncManager.registerAndCollectHistoryTsFile(this,
dataStartTime);
- }
-
public File createHistoryTsFileHardlink(File tsFile, long modsOffset) {
collectRealTimeDataLock.lock(); // synchronize the pipeLog.isHardlinkExist
try {
@@ -182,8 +146,7 @@ public class TsFilePipe implements Pipe {
return pipeLog.createTsFileAndModsHardlink(tsFile, modsOffset);
} catch (IOException e) {
- logger.error(
- String.format("Create hardlink for history tsfile %s error.",
tsFile.getPath()), e);
+ logger.error(logFormat("Create hardlink for history tsfile %s error.",
tsFile.getPath()), e);
return null;
} finally {
collectRealTimeDataLock.unlock();
@@ -197,8 +160,7 @@ public class TsFilePipe implements Pipe {
return;
}
- for (PartialPath deletePath :
- schemaSyncManager.splitPathPatternByDevice(deletion.getPath())) {
+ for (PartialPath deletePath :
LocalSyncManager.splitPathPatternByDevice(deletion.getPath())) {
Deletion splitDeletion =
new Deletion(
deletePath,
@@ -210,7 +172,7 @@ public class TsFilePipe implements Pipe {
realTimeQueue.offer(deletionData);
}
} catch (MetadataException e) {
- logger.warn(String.format("Collect deletion %s error.", deletion), e);
+ logger.warn(logFormat("Collect deletion %s error.", deletion), e);
} finally {
collectRealTimeDataLock.unlock();
}
@@ -230,7 +192,7 @@ public class TsFilePipe implements Pipe {
realTimeQueue.offer(tsFileData);
} catch (IOException e) {
logger.warn(
- String.format(
+ logFormat(
"Create Hardlink tsfile %s on disk error, serial number is %d.",
tsFile.getPath(), maxSerialNumber),
e);
@@ -243,13 +205,14 @@ public class TsFilePipe implements Pipe {
try {
pipeLog.createTsFileResourceHardlink(tsFile);
} catch (IOException e) {
- logger.warn(String.format("Record tsfile resource %s on disk error.",
tsFile.getPath()), e);
+ logger.warn(logFormat("Record tsfile resource %s on disk error.",
tsFile.getPath()), e);
}
}
/** transport data * */
@Override
public PipeData take() throws InterruptedException {
+ // TODO:should judge isCollectingRealTimeData here
if (!historyQueue.isEmpty()) {
return historyQueue.take();
}
@@ -275,6 +238,23 @@ public class TsFilePipe implements Pipe {
realTimeQueue.commit();
}
+ @Override
+ public ISyncManager getOrCreateSyncManager(String dataRegionId) {
+ return syncManagerMap.computeIfAbsent(
+ dataRegionId,
+ id ->
+ new LocalSyncManager(
+ StorageEngineV2.getInstance().getDataRegion(new
DataRegionId(Integer.parseInt(id))),
+ this));
+ }
+
+ @Override
+ public void deleteSyncManager(String dataRegionId) {
+ if (syncManagerMap.containsKey(dataRegionId)) {
+ syncManagerMap.remove(dataRegionId).delete();
+ }
+ }
+
public void commit(long serialNumber) {
if (!historyQueue.isEmpty()) {
historyQueue.commit(serialNumber);
@@ -290,13 +270,6 @@ public class TsFilePipe implements Pipe {
throw new PipeException(
String.format("Can not stop pipe %s, because the pipe is drop.",
name));
}
-
- if (!isCollectingRealTimeData) {
- registerMetadata();
- registerTsFile();
- isCollectingRealTimeData = true;
- }
-
status = PipeStatus.STOP;
}
@@ -311,29 +284,24 @@ public class TsFilePipe implements Pipe {
}
private void clear() {
- deregisterMetadata();
- deregisterTsFile();
- isCollectingRealTimeData = false;
-
try {
historyQueue.clear();
realTimeQueue.clear();
pipeLog.clear();
} catch (IOException e) {
- logger.warn(String.format("Clear pipe %s %d error.", name, createTime),
e);
+ logger.warn(logFormat("Clear pipe %s %d error.", name, createTime), e);
}
}
+ private String logFormat(String format, Object... arguments) {
+ return String.format(String.format("[%s-%s] ", this.name, this.createTime)
+ format, arguments);
+ }
+
@Override
public void close() throws PipeException {
if (status == PipeStatus.DROP) {
return;
}
-
- deregisterMetadata();
- deregisterTsFile();
- isCollectingRealTimeData = false;
-
historyQueue.close();
realTimeQueue.close();
}
@@ -374,8 +342,6 @@ public class TsFilePipe implements Pipe {
+ syncDelOp
+ ", pipeLog="
+ pipeLog
- + ", isCollectingRealTimeData="
- + isCollectingRealTimeData
+ ", maxSerialNumber="
+ maxSerialNumber
+ ", status="
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
index 197946012f..3705e5b1ff 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
@@ -96,7 +96,6 @@ public class SenderManager {
PipeMessage.MsgType.WARN,
String.format(
"Transfer piepdata %s error, skip it.",
pipeData.getSerialNumber()));
- continue;
}
pipe.commit();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index 145130c5ee..704600a436 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.sync;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -30,6 +31,8 @@ import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipeInfo;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.util.Map;
+
public class SyncPipeUtil {
// TODO(sync): delete this in new-standalone version
@@ -61,6 +64,7 @@ public class SyncPipeUtil {
return pipeSink;
}
+ // TODO(sync): delete this in new-standalone version
public static Pipe parseCreatePipePlanAsPipe(
CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws
PipeException {
boolean syncDelOp = true;
@@ -77,6 +81,28 @@ public class SyncPipeUtil {
pipeCreateTime, plan.getPipeName(), pipeSink,
plan.getDataStartTimestamp(), syncDelOp);
}
+ public static Pipe parseCreatePipePlanAsPipe(
+ CreatePipeStatement createPipeStatement, PipeSink pipeSink, long
pipeCreateTime)
+ throws PipeException {
+ boolean syncDelOp = true;
+ for (Map.Entry<String, String> entry :
createPipeStatement.getPipeAttributes().entrySet()) {
+ String attributeKey = entry.getKey().toLowerCase();
+ if ("syncdelop".equals(attributeKey)) {
+ syncDelOp = Boolean.parseBoolean(entry.getValue());
+ } else {
+ throw new PipeException(String.format("Can not recognition attribute
%s", entry.getKey()));
+ }
+ }
+
+ return new TsFilePipe(
+ pipeCreateTime,
+ createPipeStatement.getPipeName(),
+ pipeSink,
+ createPipeStatement.getStartTime(),
+ syncDelOp);
+ }
+
+ // TODO(sync): delete this in new-standalone version
public static PipeInfo parseCreatePipePlanAsPipeInfo(
CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws
PipeException {
boolean syncDelOp = true;
@@ -97,6 +123,27 @@ public class SyncPipeUtil {
syncDelOp);
}
+ public static PipeInfo parseCreatePipePlanAsPipeInfo(
+ CreatePipeStatement createPipeStatement, PipeSink pipeSink, long
pipeCreateTime)
+ throws PipeException {
+ boolean syncDelOp = true;
+ for (Map.Entry<String, String> entry :
createPipeStatement.getPipeAttributes().entrySet()) {
+ String attributeKey = entry.getKey().toLowerCase();
+ if ("syncdelop".equals(attributeKey)) {
+ syncDelOp = Boolean.parseBoolean(entry.getValue());
+ } else {
+ throw new PipeException(String.format("Can not recognition attribute
%s", entry.getKey()));
+ }
+ }
+
+ return new TsFilePipeInfo(
+ createPipeStatement.getPipeName(),
+ pipeSink.getPipeSinkName(),
+ pipeCreateTime,
+ createPipeStatement.getStartTime(),
+ syncDelOp);
+ }
+
/** parse PipeInfo ass Pipe, ignore status */
public static Pipe parsePipeInfoAsPipe(PipeInfo pipeInfo, PipeSink pipeSink)
throws PipeException {
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index b7c8ed1117..5b102d45e3 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -71,7 +71,8 @@ public class TsFileProcessorTest {
private TsFileProcessor processor;
private String storageGroup = "root.vehicle";
- private StorageGroupInfo sgInfo = new StorageGroupInfo(null);
+ private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+ private StorageGroupInfo sgInfo;
private String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0,
0, 0);
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
@@ -82,12 +83,13 @@ public class TsFileProcessorTest {
private static Logger logger =
LoggerFactory.getLogger(TsFileProcessorTest.class);
@Before
- public void setUp() {
+ public void setUp() throws Exception {
File file = new File(filePath);
if (!file.getParentFile().exists()) {
Assert.assertTrue(file.getParentFile().mkdirs());
}
EnvironmentUtils.envSetUp();
+ sgInfo = new StorageGroupInfo(new
DataRegionTest.DummyDataRegion(systemDir, storageGroup));
MetadataManagerHelper.initMetadata();
context = EnvironmentUtils.TEST_QUERY_CONTEXT;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
index 6ea46e968d..7ac36ca6c1 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.sync.receiver.manager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.SyncInfo;
@@ -72,10 +72,10 @@ public class SyncInfoTest {
} catch (PipeException e) {
// throw exception because only one pipe is allowed now
}
- syncInfo.operatePipe(pipe1, Operator.OperatorType.DROP_PIPE);
+ syncInfo.operatePipe(pipe1, StatementType.DROP_PIPE);
syncInfo.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
- syncInfo.operatePipe(pipe2, Operator.OperatorType.STOP_PIPE);
- syncInfo.operatePipe(pipe2, Operator.OperatorType.START_PIPE);
+ syncInfo.operatePipe(pipe2, StatementType.STOP_PIPE);
+ syncInfo.operatePipe(pipe2, StatementType.START_PIPE);
Assert.assertEquals(2, syncInfo.getAllPipeInfos().size());
Assert.assertEquals(1, syncInfo.getAllPipeSink().size());
PipeMessage info = new PipeMessage(PipeMessage.MsgType.INFO, "info");
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index 26919b948e..d072fe166e 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.sync.receiver.recovery;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
@@ -69,11 +69,11 @@ public class SyncLogTest {
createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
log.addPipeSink(createPipeSinkPlan);
log.addPipe(new CreatePipePlan(pipe1, "demo"), 1);
- log.operatePipe(pipe1, Operator.OperatorType.DROP_PIPE);
+ log.operatePipe(pipe1, StatementType.DROP_PIPE);
log.addPipe(new CreatePipePlan(pipe2, "demo"), 2);
- log.operatePipe(pipe1, Operator.OperatorType.STOP_PIPE);
- log.operatePipe(pipe1, Operator.OperatorType.START_PIPE);
+ log.operatePipe(pipe1, StatementType.STOP_PIPE);
+ log.operatePipe(pipe1, StatementType.START_PIPE);
log.close();
SyncLogReader syncLogReader = new SyncLogReader();
syncLogReader.recover();
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 3149dd0ba0..e177f8bcac 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -364,6 +364,21 @@ struct TGetPathsSetTemplatesResp {
2: optional list<string> pathList
}
+// Show pipe
+struct TPipeInfo {
+ 1: required i64 createTime
+ 2: required string pipeName
+ 3: required string role
+ 4: required string remote
+ 5: required string status
+ 6: required string message
+}
+
+struct TShowPipeResp {
+ 1: required common.TSStatus status
+ 2: optional list<TPipeInfo> pipeInfoList
+}
+
service IConfigNodeRPCService {
// ======================================================