This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e0605ca07d [IOTDB-4116] Merge SenderService and ReceiverService into
SyncService (#6966)
e0605ca07d is described below
commit e0605ca07d53e20a9b121432851961e3e2178d01
Author: Chen YZ <[email protected]>
AuthorDate: Fri Aug 12 23:46:21 2022 +0800
[IOTDB-4116] Merge SenderService and ReceiverService into SyncService
(#6966)
---
.../db/integration/sync/IoTDBSyncReceiverIT.java | 6 +-
.../apache/iotdb/commons/service/ServiceType.java | 5 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 27 ++-
.../java/org/apache/iotdb/db/service/DataNode.java | 7 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 6 +-
.../java/org/apache/iotdb/db/service/NewIoTDB.java | 6 +-
.../SenderService.java => SyncService.java} | 213 ++++++++++++++-------
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 4 +-
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 7 +-
.../org/apache/iotdb/db/sync/common/SyncInfo.java | 5 +
.../db/sync/common/persistence/SyncLogReader.java | 2 +-
.../iotdb/db/sync/receiver/ReceiverService.java | 151 ---------------
.../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 4 +-
.../transport/client/IoTDBSInkTransportClient.java | 6 +-
.../transport/server/TransportServerManager.java | 2 +-
15 files changed, 187 insertions(+), 264 deletions(-)
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 2614dd9d2b..016ccb84c7 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -26,11 +26,11 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
@@ -95,7 +95,7 @@ public class IoTDBSyncReceiverIT {
EnvironmentUtils.cleanEnv();
EnvironmentUtils.envSetUp();
try {
- ReceiverService.getInstance().startPipeServer(true);
+ SyncService.getInstance().startPipeServer(true);
new Socket("localhost", 6670).close();
} catch (Exception e) {
Assert.fail("Failed to start pipe server because " + e.getMessage());
@@ -124,7 +124,7 @@ public class IoTDBSyncReceiverIT {
public void testStopPipeServer() {
logger.info("testStopPipeServerCheck");
try {
- ReceiverService.getInstance().stopPipeServer();
+ SyncService.getInstance().stopPipeServer();
} catch (PipeServerException e) {
Assert.fail("Can not stop pipe server");
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 904562f619..ae930eea60 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -33,11 +33,10 @@ public enum ServiceType {
JVM_MEM_CONTROL_SERVICE("Memory Controller", ""),
AUTHORIZATION_SERVICE("Authorization ServerService", ""),
FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
- SYNC_SERVICE("SYNC ServerService", ""),
UPGRADE_SERVICE("UPGRADE DataService", ""),
SETTLE_SERVICE("SETTLE DataService", ""),
- SENDER_SERVICE("Sync Sender service", ""),
- RECEIVER_SERVICE("Sync Receiver service", ""),
+ SYNC_RPC_SERVICE("Sync RPC ServerService", ""),
+ SYNC_SERVICE("Sync Service", ""),
MERGE_SERVICE("Merge Manager", "Merge Manager"),
COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE",
"PERFORMANCE_STATISTIC_SERVICE"),
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index ce944941e0..70c4883dcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -155,9 +155,8 @@ import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.query.executor.QueryRouter;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.SettleService;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
@@ -453,7 +452,7 @@ public class PlanExecutor implements IPlanExecutor {
private boolean operateStopPipeServer() throws QueryProcessException {
try {
- ReceiverService.getInstance().stopPipeServer();
+ SyncService.getInstance().stopPipeServer();
} catch (PipeServerException e) {
throw new QueryProcessException(e);
}
@@ -462,7 +461,7 @@ public class PlanExecutor implements IPlanExecutor {
private boolean operateStartPipeServer() throws QueryProcessException {
try {
- ReceiverService.getInstance().startPipeServer(false);
+ SyncService.getInstance().startPipeServer(false);
} catch (PipeServerException e) {
throw new QueryProcessException(e);
}
@@ -772,7 +771,7 @@ public class PlanExecutor implements IPlanExecutor {
}
private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
- return ReceiverService.getInstance().showPipeServer(plan);
+ return SyncService.getInstance().showPipeServer(plan);
}
private QueryDataSet processCountNodes(CountPlan countPlan) throws
MetadataException {
@@ -1295,7 +1294,7 @@ public class PlanExecutor implements IPlanExecutor {
new PartialPath(COLUMN_PIPESINK_ATTRIBUTES, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
boolean showAll = "".equals(plan.getPipeSinkName());
- for (PipeSink pipeSink : SenderService.getInstance().getAllPipeSink()) {
+ for (PipeSink pipeSink : SyncService.getInstance().getAllPipeSink()) {
if (showAll ||
plan.getPipeSinkName().equals(pipeSink.getPipeSinkName())) {
RowRecord record = new RowRecord(0);
record.addField(Binary.valueOf(pipeSink.getPipeSinkName()),
TSDataType.TEXT);
@@ -1341,8 +1340,8 @@ public class PlanExecutor implements IPlanExecutor {
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT));
- SenderService.getInstance().showPipe(plan, listDataSet);
- ReceiverService.getInstance().showPipe(plan, listDataSet);
+ SyncService.getInstance().showPipe(plan, listDataSet);
+ SyncService.getInstance().showPipe(plan, listDataSet);
// sort by create time
listDataSet.sort(Comparator.comparing(o ->
o.getFields().get(0).getStringValue()));
return listDataSet;
@@ -2520,7 +2519,7 @@ public class PlanExecutor implements IPlanExecutor {
private void createPipeSink(CreatePipeSinkPlan plan) throws
QueryProcessException {
try {
- SenderService.getInstance().addPipeSink(plan);
+ SyncService.getInstance().addPipeSink(plan);
} catch (PipeSinkException e) {
throw new QueryProcessException("Create pipeSink error.", e); // e will
override the message
} catch (IllegalArgumentException e) {
@@ -2531,7 +2530,7 @@ public class PlanExecutor implements IPlanExecutor {
private void dropPipeSink(DropPipeSinkPlan plan) throws
QueryProcessException {
try {
- SenderService.getInstance().dropPipeSink(plan.getPipeSinkName());
+ SyncService.getInstance().dropPipeSink(plan.getPipeSinkName());
} catch (PipeSinkException e) {
throw new QueryProcessException("Can not drop pipeSink.", e);
}
@@ -2539,7 +2538,7 @@ public class PlanExecutor implements IPlanExecutor {
private void createPipe(CreatePipePlan plan) throws QueryProcessException {
try {
- SenderService.getInstance().addPipe(plan);
+ SyncService.getInstance().addPipe(plan);
} catch (PipeException e) {
throw new QueryProcessException("Create pipe error.", e);
}
@@ -2548,11 +2547,11 @@ public class PlanExecutor implements IPlanExecutor {
private void operatePipe(OperatePipePlan plan) throws QueryProcessException {
try {
if (Operator.OperatorType.STOP_PIPE.equals(plan.getOperatorType())) {
- SenderService.getInstance().stopPipe(plan.getPipeName());
+ SyncService.getInstance().stopPipe(plan.getPipeName());
} else if
(Operator.OperatorType.START_PIPE.equals(plan.getOperatorType())) {
- SenderService.getInstance().startPipe(plan.getPipeName());
+ SyncService.getInstance().startPipe(plan.getPipeName());
} else if
(Operator.OperatorType.DROP_PIPE.equals(plan.getOperatorType())) {
- SenderService.getInstance().dropPipe(plan.getPipeName());
+ SyncService.getInstance().dropPipe(plan.getPipeName());
} else {
throw new QueryProcessException(
String.format("Error operator type %s.", plan.getOperatorType()),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 5dd30de66f..c5f27b0813 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -61,8 +61,7 @@ import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -282,8 +281,6 @@ public class DataNode implements DataNodeMBean {
registerUdfServices();
- registerManager.register(ReceiverService.getInstance());
-
logger.info(
"IoTDB DataNode is setting up, some storage groups may not be ready
now, please wait several seconds...");
@@ -297,7 +294,7 @@ public class DataNode implements DataNodeMBean {
}
}
- registerManager.register(SenderService.getInstance());
+ registerManager.register(SyncService.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
// in mpp mode we temporarily don't start settle service because it uses
StorageEngine directly
// in itself, but currently we need to use StorageEngineV2 instead of
StorageEngine in mpp mode.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 56ecdccb5b..f262c33a8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -48,8 +48,7 @@ import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricsService;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.wal.WALManager;
import org.slf4j.Logger;
@@ -150,7 +149,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
- registerManager.register(SenderService.getInstance());
+ registerManager.register(SyncService.getInstance());
registerManager.register(WALManager.getInstance());
registerManager.register(StorageEngine.getInstance());
@@ -165,7 +164,6 @@ public class IoTDB implements IoTDBMBean {
+ File.separator
+ "udf"
+ File.separator));
- registerManager.register(ReceiverService.getInstance());
// in cluster mode, RPC service is not enabled.
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index 5eb6db49a7..a317ee9802 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -48,8 +48,7 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.wal.WALManager;
import org.slf4j.Logger;
@@ -129,7 +128,7 @@ public class NewIoTDB implements NewIoTDBMBean {
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
- registerManager.register(SenderService.getInstance());
+ registerManager.register(SyncService.getInstance());
registerManager.register(WALManager.getInstance());
registerManager.register(StorageEngineV2.getInstance());
@@ -145,7 +144,6 @@ public class NewIoTDB implements NewIoTDBMBean {
+ File.separator
+ "udf"
+ File.separator));
- registerManager.register(ReceiverService.getInstance());
// in cluster mode, RPC service is not enabled.
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
similarity index 82%
rename from
server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
rename to server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index dd6e8c7dbb..f78978855d 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -17,26 +17,28 @@
* under the License.
*
*/
-package org.apache.iotdb.db.sync.sender.service;
+package org.apache.iotdb.db.sync;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.exception.sync.PipeServerException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
@@ -47,11 +49,15 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
+import org.apache.iotdb.db.sync.sender.service.TransportHandler;
+import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
@@ -59,11 +65,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-public class SenderService implements IService {
- private static final Logger logger =
LoggerFactory.getLogger(SenderService.class);
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STATUS;
+
+public class SyncService implements IService {
+ private static final Logger logger =
LoggerFactory.getLogger(SyncService.class);
private Pipe runningPipe;
@@ -74,16 +83,16 @@ public class SenderService implements IService {
private ISyncInfoFetcher syncInfoFetcher =
LocalSyncInfoFetcher.getInstance();
- private SenderService() {}
+ private SyncService() {}
- private static class SenderServiceHolder {
- private static final SenderService INSTANCE = new SenderService();
+ private static class SyncServiceHolder {
+ private static final SyncService INSTANCE = new SyncService();
- private SenderServiceHolder() {}
+ private SyncServiceHolder() {}
}
- public static SenderService getInstance() {
- return SenderService.SenderServiceHolder.INSTANCE;
+ public static SyncService getInstance() {
+ return SyncServiceHolder.INSTANCE;
}
// region Interfaces and Implementation of PipeSink
@@ -107,7 +116,61 @@ public class SenderService implements IService {
}
public List<PipeSink> getAllPipeSink() {
- return syncInfoFetcher.getAllPipeSink();
+ return syncInfoFetcher.getAllPipeSinks();
+ }
+
+ // endregion
+
+ // region Interfaces and Implementation of PipeServer
+
+ /**
+ * start receiver service
+ *
+ * @param isRecovery if isRecovery, it will ignore check and force a start
+ */
+ public synchronized void startPipeServer(boolean isRecovery) throws
PipeServerException {
+ if (syncInfoFetcher.isPipeServerEnable() && !isRecovery) {
+ return;
+ }
+ try {
+ TransportServerManager.getInstance().startService();
+ TSStatus status = syncInfoFetcher.startPipeServer();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeServerException("Failed to start pipe server because " +
status.getMessage());
+ }
+ } catch (StartupException e) {
+ throw new PipeServerException("Failed to start pipe server because " +
e.getMessage());
+ }
+ }
+
+ /** stop receiver service */
+ public synchronized void stopPipeServer() throws PipeServerException {
+ if (!syncInfoFetcher.isPipeServerEnable()) {
+ return;
+ }
+ TransportServerManager.getInstance().stopService();
+ TSStatus status = syncInfoFetcher.stopPipeServer();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeServerException("Failed to stop pipe server because " +
status.getMessage());
+ }
+ }
+
+ /**
+ * query by sql SHOW PIPESERVER STATUS
+ *
+ * @return QueryDataSet contained one column: enable
+ */
+ public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new
PartialPath(COLUMN_PIPESERVER_STATUS, false)),
+ Collections.singletonList(TSDataType.BOOLEAN));
+ RowRecord rowRecord = new RowRecord(0);
+ Field status = new Field(TSDataType.BOOLEAN);
+ status.setBoolV(syncInfoFetcher.isPipeServerEnable());
+ rowRecord.addField(status);
+ dataSet.putRecord(rowRecord);
+ return dataSet;
}
// endregion
@@ -241,56 +304,6 @@ public class SenderService implements IService {
}
}
- // endregion
-
- // region Interfaces and Implementation of External-Pipe
-
- /** Start ExternalPipeProcessor who handle externalPipe */
- private void startExternalPipeManager(boolean startExtPipe) throws
PipeException {
- if (!(runningPipe instanceof TsFilePipe)) {
- logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe.
" + runningPipe);
- return;
- }
-
- PipeSink pipeSink = runningPipe.getPipeSink();
- if (!(pipeSink instanceof ExternalPipeSink)) {
- logger.error("startExternalPipeManager(), pipeSink is not
ExternalPipeSink." + pipeSink);
- return;
- }
-
- String extPipeSinkTypeName = ((ExternalPipeSink)
pipeSink).getExtPipeSinkTypeName();
- IExternalPipeSinkWriterFactory externalPipeSinkWriterFactory =
-
ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName);
- if (externalPipeSinkWriterFactory == null) {
- logger.error(
- String.format(
- "startExternalPipeManager(), can not found ExternalPipe plugin
for {}.",
- extPipeSinkTypeName));
- throw new PipeException("Can not found ExternalPipe plugin for " +
extPipeSinkTypeName + ".");
- }
-
- if (extPipePluginManager == null) {
- extPipePluginManager = new ExtPipePluginManager((TsFilePipe)
this.runningPipe);
- }
-
- if (startExtPipe) {
- try {
- extPipePluginManager.startExtPipe(
- extPipeSinkTypeName, ((ExternalPipeSink)
pipeSink).getSinkParams());
- } catch (IOException e) {
- logger.error("Failed to start External Pipe: {}.",
extPipeSinkTypeName, e);
- throw new PipeException(
- "Failed to start External Pipe: " + extPipeSinkTypeName + ". " +
e.getMessage());
- }
- }
- }
-
- public ExtPipePluginManager getExternalPipeManager() {
- return extPipePluginManager;
- }
-
- // endregion
-
public synchronized void receiveMsg(PipeMessage.MsgType type, String
message) {
if (runningPipe == null || runningPipe.getStatus() ==
Pipe.PipeStatus.DROP) {
logger.info(String.format("No running pipe for receiving msg %s.",
message));
@@ -321,7 +334,7 @@ public class SenderService implements IService {
public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
boolean showAll = "".equals(plan.getPipeName());
- for (PipeInfo pipe : SenderService.getInstance().getAllPipeInfos()) {
+ for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
RowRecord record = new RowRecord(0);
record.addField(
@@ -337,7 +350,7 @@ public class SenderService implements IService {
PipeSink pipeSink =
syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for
external pipe
ExtPipePluginManager extPipePluginManager =
- SenderService.getInstance().getExternalPipeManager();
+ SyncService.getInstance().getExternalPipeManager();
if (extPipePluginManager != null) {
String extPipeType = ((ExternalPipeSink)
pipeSink).getExtPipeSinkTypeName();
@@ -363,11 +376,71 @@ public class SenderService implements IService {
listDataSet.putRecord(record);
}
}
+ // TODO: implement show pipe in receiver
}
+ // endregion
+
+ // region Interfaces and Implementation of External-Pipe
+
+ /** Start ExternalPipeProcessor who handle externalPipe */
+ private void startExternalPipeManager(boolean startExtPipe) throws
PipeException {
+ if (!(runningPipe instanceof TsFilePipe)) {
+ logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe.
" + runningPipe);
+ return;
+ }
+
+ PipeSink pipeSink = runningPipe.getPipeSink();
+ if (!(pipeSink instanceof ExternalPipeSink)) {
+ logger.error("startExternalPipeManager(), pipeSink is not
ExternalPipeSink." + pipeSink);
+ return;
+ }
+
+ String extPipeSinkTypeName = ((ExternalPipeSink)
pipeSink).getExtPipeSinkTypeName();
+ IExternalPipeSinkWriterFactory externalPipeSinkWriterFactory =
+
ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName);
+ if (externalPipeSinkWriterFactory == null) {
+ logger.error(
+ String.format(
+ "startExternalPipeManager(), can not found ExternalPipe plugin
for {}.",
+ extPipeSinkTypeName));
+ throw new PipeException("Can not found ExternalPipe plugin for " +
extPipeSinkTypeName + ".");
+ }
+
+ if (extPipePluginManager == null) {
+ extPipePluginManager = new ExtPipePluginManager((TsFilePipe)
this.runningPipe);
+ }
+
+ if (startExtPipe) {
+ try {
+ extPipePluginManager.startExtPipe(
+ extPipeSinkTypeName, ((ExternalPipeSink)
pipeSink).getSinkParams());
+ } catch (IOException e) {
+ logger.error("Failed to start External Pipe: {}.",
extPipeSinkTypeName, e);
+ throw new PipeException(
+ "Failed to start External Pipe: " + extPipeSinkTypeName + ". " +
e.getMessage());
+ }
+ }
+ }
+
+ public ExtPipePluginManager getExternalPipeManager() {
+ return extPipePluginManager;
+ }
+
+ // endregion
+
/** IService * */
@Override
public void start() throws StartupException {
+ // recover receiver
+ if (syncInfoFetcher.isPipeServerEnable()) {
+ try {
+ startPipeServer(true);
+ } catch (PipeServerException e) {
+ throw new StartupException(e.getMessage());
+ }
+ }
+ // recover sender
// == Check whether loading extPipe plugin successfully.
ExtPipePluginRegister extPipePluginRegister =
ExtPipePluginRegister.getInstance();
if (extPipePluginRegister == null) {
@@ -422,19 +495,17 @@ public class SenderService implements IService {
@Override
public ServiceType getID() {
- return ServiceType.SENDER_SERVICE;
+ return ServiceType.SYNC_SERVICE;
}
private void recover() throws IOException, PipeException, StartupException {
- SyncLogReader analyzer = new SyncLogReader();
- analyzer.recover();
- PipeInfo runningPipeInfo = analyzer.getRunningPipeInfo();
- this.runningPipe =
- SyncPipeUtil.parsePipeInfoAsPipe(
- runningPipeInfo,
analyzer.getAllPipeSinks().get(runningPipeInfo.getPipeSinkName()));
- if (runningPipe == null ||
Pipe.PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
+ PipeInfo runningPipeInfo = syncInfoFetcher.getRunningPipeInfo();
+ if (runningPipeInfo == null ||
Pipe.PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
return;
} else {
+ this.runningPipe =
+ SyncPipeUtil.parsePipeInfoAsPipe(
+ runningPipeInfo,
syncInfoFetcher.getPipeSink(runningPipeInfo.getPipeSinkName()));
switch (runningPipeInfo.getStatus()) {
case RUNNING:
runningPipe.start();
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index 72902febee..b1bf9e1c90 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -45,7 +45,7 @@ public interface ISyncInfoFetcher {
PipeSink getPipeSink(String name);
- List<PipeSink> getAllPipeSink();
+ List<PipeSink> getAllPipeSinks();
// endregion
// region Interfaces of Pipe
@@ -61,6 +61,8 @@ public interface ISyncInfoFetcher {
List<PipeInfo> getAllPipeInfos();
+ PipeInfo getRunningPipeInfo();
+
// endregion
String getPipeMsg(String pipeName, long createTime);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index e479082e36..93e12d4c38 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -101,7 +101,7 @@ public class LocalSyncInfoFetcher implements
ISyncInfoFetcher {
}
@Override
- public List<PipeSink> getAllPipeSink() {
+ public List<PipeSink> getAllPipeSinks() {
return syncInfo.getAllPipeSink();
}
@@ -154,6 +154,11 @@ public class LocalSyncInfoFetcher implements
ISyncInfoFetcher {
return syncInfo.getAllPipeInfos();
}
+ @Override
+ public PipeInfo getRunningPipeInfo() {
+ return syncInfo.getRunningPipeInfo();
+ }
+
@Override
public String getPipeMsg(String pipeName, long createTime) {
return syncInfo.getPipeMessage(pipeName, createTime, false).getMsg();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 1aa87001d9..8f00ca3ec9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -186,6 +186,11 @@ public class SyncInfo {
return pipes;
}
+ /** @return null if no pipe has been created */
+ public PipeInfo getRunningPipeInfo() {
+ return runningPipe;
+ }
+
private void checkIfPipeExistAndRunning(String pipeName) throws
PipeException {
if (runningPipe == null || runningPipe.getStatus() ==
Pipe.PipeStatus.DROP) {
throw new PipeException("There is no existing pipe.");
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index 361e9e8d57..66f8fe3a03 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -77,7 +77,7 @@ public class SyncLogReader {
logger.error("Sync msg log recovery error: log file parse error at
line " + lineNum);
logger.error(e.getMessage());
throw new StartupException(
- ServiceType.RECEIVER_SERVICE.getName(),
+ ServiceType.SYNC_SERVICE.getName(),
"Sync msg log file recover error at line " + lineNum);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
deleted file mode 100644
index f781932d70..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.receiver;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.sync.SyncPathUtil;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
-import org.apache.iotdb.db.query.dataset.ListDataSet;
-import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
-import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
-import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collections;
-
-import static
org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STATUS;
-
-public class ReceiverService implements IService {
- private static final Logger logger =
LoggerFactory.getLogger(ReceiverService.class);
-
- private ISyncInfoFetcher syncInfoFetcher =
LocalSyncInfoFetcher.getInstance();
-
- /**
- * start receiver service
- *
- * @param isRecovery if isRecovery, it will ignore check and force a start
- */
- public synchronized void startPipeServer(boolean isRecovery) throws
PipeServerException {
- if (syncInfoFetcher.isPipeServerEnable() && !isRecovery) {
- return;
- }
- try {
- TransportServerManager.getInstance().startService();
- TSStatus status = syncInfoFetcher.startPipeServer();
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeServerException("Failed to start pipe server because " +
status.getMessage());
- }
- } catch (StartupException e) {
- throw new PipeServerException("Failed to start pipe server because " +
e.getMessage());
- }
- }
-
- /** stop receiver service */
- public synchronized void stopPipeServer() throws PipeServerException {
- if (!syncInfoFetcher.isPipeServerEnable()) {
- return;
- }
- TransportServerManager.getInstance().stopService();
- TSStatus status = syncInfoFetcher.stopPipeServer();
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeServerException("Failed to stop pipe server because " +
status.getMessage());
- }
- }
-
- private void createDir(String pipeName, String remoteIp, long createTime) {
- File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp,
createTime));
- if (!f.exists()) {
- f.mkdirs();
- }
- f = new File(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp,
createTime));
- if (!f.exists()) {
- f.mkdirs();
- }
- }
-
- /**
- * query by sql SHOW PIPESERVER STATUS
- *
- * @return QueryDataSet contained one column: enable
- */
- public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
- ListDataSet dataSet =
- new ListDataSet(
- Collections.singletonList(new
PartialPath(COLUMN_PIPESERVER_STATUS, false)),
- Collections.singletonList(TSDataType.BOOLEAN));
- RowRecord rowRecord = new RowRecord(0);
- Field status = new Field(TSDataType.BOOLEAN);
- status.setBoolV(syncInfoFetcher.isPipeServerEnable());
- rowRecord.addField(status);
- dataSet.putRecord(rowRecord);
- return dataSet;
- }
-
- /** query by sql SHOW PIPE */
- public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
- // TODO: implement show pipe in receiver
- return dataSet;
- }
-
- private ReceiverService() {}
-
- public static ReceiverService getInstance() {
- return ReceiverServiceHolder.INSTANCE;
- }
-
- /** IService * */
- @Override
- public void start() throws StartupException {
- if (syncInfoFetcher.isPipeServerEnable()) {
- try {
- startPipeServer(true);
- } catch (PipeServerException e) {
- throw new StartupException(e.getMessage());
- }
- }
- }
-
- @Override
- public void stop() {}
-
- @Override
- public ServiceType getID() {
- return ServiceType.RECEIVER_SERVICE;
- }
-
- private static class ReceiverServiceHolder {
- private static final ReceiverService INSTANCE = new ReceiverService();
-
- private ReceiverServiceHolder() {}
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index 94aae7b40b..caefd130eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.sync.sender.pipe;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.transport.client.ITransportClient;
/**
@@ -55,7 +55,7 @@ public interface Pipe {
/**
* Close this pipe, stop collecting data from IoTDB, but do not delete
information about this pipe
- * on disk. Used for {@linkplain SenderService#shutdown(long)}. Do not
change the status of this
+ * on disk. Used for {@linkplain SyncService#shutdown(long)}. Do not change
the status of this
* pipe.
*
* @throws PipeException Some inside error happens(such as IOException about
disk).
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
index e15ea9e8cc..b68cd44cf4 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
@@ -23,11 +23,11 @@ import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.service.transport.thrift.MetaInfo;
import org.apache.iotdb.service.transport.thrift.TransportStatus;
@@ -427,7 +427,7 @@ public class IoTDBSInkTransportClient implements
ITransportClient {
while (!Thread.currentThread().isInterrupted()) {
try {
if (!handshake()) {
- SenderService.getInstance()
+ SyncService.getInstance()
.receiveMsg(
PipeMessage.MsgType.ERROR,
String.format("Can not handshake with %s:%d.", ipAddress,
port));
@@ -437,7 +437,7 @@ public class IoTDBSInkTransportClient implements
ITransportClient {
if (!senderTransport(pipeData)) {
logger.error(String.format("Can not transfer pipedata %s, skip
it.", pipeData));
// can do something.
- SenderService.getInstance()
+ SyncService.getInstance()
.receiveMsg(
PipeMessage.MsgType.WARN,
String.format(
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
index 4bf16d20cf..05c8ce1dc7 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
@@ -64,7 +64,7 @@ public class TransportServerManager extends ThriftService
@Override
public ServiceType getID() {
- return ServiceType.SYNC_SERVICE;
+ return ServiceType.SYNC_RPC_SERVICE;
}
@Override