This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new 786eecb [To new_sync][IOTDB-2830] SQL: SHOW PIPE and SHOW PIPESERVER
STATUS (#5387)
786eecb is described below
commit 786eecbd9495a6a5d3f7ccfb1b08894a6ceb7172
Author: Chen YZ <[email protected]>
AuthorDate: Thu Mar 31 21:32:48 2022 +0800
[To new_sync][IOTDB-2830] SQL: SHOW PIPE and SHOW PIPESERVER STATUS (#5387)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 2 +-
.../db/integration/sync/IoTDBSyncReceiverIT.java | 99 +++++++++++-----------
.../iotdb/db/integration/sync/SyncTestUtil.java | 21 ++++-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 7 +-
.../iotdb/db/newsync/receiver/ReceiverService.java | 74 ++++++++--------
.../newsync/receiver/manager/ReceiverManager.java | 65 ++++++++++++--
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 17 ++--
.../db/qp/logical/sys/ShowPipeServerOperator.java | 15 +---
.../db/qp/physical/sys/ShowPipeServerPlan.java | 15 ----
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 7 +-
.../receiver/manager/ReceiverManagerTest.java | 4 +-
.../iotdb/db/qp/physical/PhysicalPlanTest.java | 8 +-
12 files changed, 178 insertions(+), 156 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 7e4cd62..ca15182 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -717,7 +717,7 @@ stopPipeServer
;
showPipeServer
- : SHOW PIPESERVER (pipeName=ID)?
+ : SHOW PIPESERVER
;
/**
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 358ed09..638b3f3 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -32,10 +32,7 @@ import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.newsync.transport.client.TransportClient;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
@@ -46,9 +43,6 @@ import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.io.FileUtils;
@@ -153,56 +147,65 @@ public class IoTDBSyncReceiverIT {
@Test
public void testPipeOperation() {
logger.info("testPipeOperation");
+ String[] columnNames = {"create time", "name", "role", "remote", "status",
"message"};
+ String showPipeSql = "SHOW PIPE";
try {
// create
client.heartbeat(new SyncRequest(RequestType.CREATE, pipeName1,
remoteIp1, createdTime1));
- QueryDataSet allDataSet = ReceiverService.getInstance().showPipe(new
ShowPipeServerPlan(""));
- RowRecord rowRecord = allDataSet.next();
- List<Field> fields = rowRecord.getFields();
- Assert.assertEquals(4, fields.size());
- Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
- Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
- Assert.assertEquals(PipeStatus.STOP.name(),
fields.get(2).getStringValue());
- Assert.assertEquals(
- DatetimeUtils.convertLongToDate(createdTime1),
fields.get(3).getStringValue());
- Assert.assertFalse(allDataSet.hasNext());
+ String[] retArray =
+ new String[] {
+ String.format(
+ "%s,%s,%s,%s,%s,%s",
+ DatetimeUtils.convertLongToDate(createdTime1),
+ pipeName1,
+ "receiver",
+ remoteIp1,
+ PipeStatus.STOP.name(),
+ "")
+ };
+ SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
// start
client.heartbeat(new SyncRequest(RequestType.START, pipeName1,
remoteIp1, createdTime1));
- QueryDataSet pipe1DataSet =
- ReceiverService.getInstance().showPipe(new
ShowPipeServerPlan(pipeName1));
- rowRecord = pipe1DataSet.next();
- fields = rowRecord.getFields();
- Assert.assertEquals(4, fields.size());
- Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
- Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
- Assert.assertEquals(PipeStatus.RUNNING.name(),
fields.get(2).getStringValue());
- Assert.assertEquals(
- DatetimeUtils.convertLongToDate(createdTime1),
fields.get(3).getStringValue());
- Assert.assertFalse(pipe1DataSet.hasNext());
+ retArray =
+ new String[] {
+ String.format(
+ "%s,%s,%s,%s,%s,%s",
+ DatetimeUtils.convertLongToDate(createdTime1),
+ pipeName1,
+ "receiver",
+ remoteIp1,
+ PipeStatus.RUNNING.name(),
+ "")
+ };
+ SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
// stop
client.heartbeat(new SyncRequest(RequestType.STOP, pipeName1, remoteIp1,
createdTime1));
- pipe1DataSet = ReceiverService.getInstance().showPipe(new
ShowPipeServerPlan(pipeName1));
- rowRecord = pipe1DataSet.next();
- fields = rowRecord.getFields();
- Assert.assertEquals(4, fields.size());
- Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
- Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
- Assert.assertEquals(PipeStatus.STOP.name(),
fields.get(2).getStringValue());
- Assert.assertEquals(
- DatetimeUtils.convertLongToDate(createdTime1),
fields.get(3).getStringValue());
- Assert.assertFalse(pipe1DataSet.hasNext());
+ retArray =
+ new String[] {
+ String.format(
+ "%s,%s,%s,%s,%s,%s",
+ DatetimeUtils.convertLongToDate(createdTime1),
+ pipeName1,
+ "receiver",
+ remoteIp1,
+ PipeStatus.STOP.name(),
+ "")
+ };
+ SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
// drop
client.heartbeat(new SyncRequest(RequestType.DROP, pipeName1, remoteIp1,
createdTime1));
- pipe1DataSet = ReceiverService.getInstance().showPipe(new
ShowPipeServerPlan(pipeName1));
- rowRecord = pipe1DataSet.next();
- fields = rowRecord.getFields();
- Assert.assertEquals(4, fields.size());
- Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
- Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
- Assert.assertEquals(PipeStatus.DROP.name(),
fields.get(2).getStringValue());
- Assert.assertEquals(
- DatetimeUtils.convertLongToDate(createdTime1),
fields.get(3).getStringValue());
- Assert.assertFalse(pipe1DataSet.hasNext());
+ retArray =
+ new String[] {
+ String.format(
+ "%s,%s,%s,%s,%s,%s",
+ DatetimeUtils.convertLongToDate(createdTime1),
+ pipeName1,
+ "receiver",
+ remoteIp1,
+ PipeStatus.DROP.name(),
+ "")
+ };
+ SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
index 3c32db1..a17f16f 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
@@ -154,7 +154,8 @@ public class SyncTestUtil {
}
}
- public static void checkResult(String sql, String[] columnNames, String[]
retArray)
+ public static void checkResult(
+ String sql, String[] columnNames, String[] retArray, boolean
hasTimeColumn)
throws ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection =
@@ -169,14 +170,21 @@ public class SyncTestUtil {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
map.put(resultSetMetaData.getColumnName(i), i);
}
- assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ assertEquals(
+ hasTimeColumn ? columnNames.length + 1 : columnNames.length,
+ resultSetMetaData.getColumnCount());
int cnt = 0;
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
- builder.append(resultSet.getString(1));
+ if (hasTimeColumn) {
+ builder.append(resultSet.getString(1)).append(",");
+ }
for (String columnName : columnNames) {
int index = map.get(columnName);
- builder.append(",").append(resultSet.getString(index));
+ builder.append(resultSet.getString(index)).append(",");
+ }
+ if (builder.length() > 0) {
+ builder.deleteCharAt(builder.length() - 1);
}
assertEquals(retArray[cnt], builder.toString());
cnt++;
@@ -187,4 +195,9 @@ public class SyncTestUtil {
fail(e.getMessage());
}
}
+
+ public static void checkResult(String sql, String[] columnNames, String[]
retArray)
+ throws ClassNotFoundException {
+ checkResult(sql, columnNames, retArray, true);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 18dc87f..6b5aa6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -123,17 +123,18 @@ public class IoTDBConstant {
public static final String COLUMN_TRIGGER_STATUS_STARTED = "started";
public static final String COLUMN_TRIGGER_STATUS_STOPPED = "stopped";
+ // sync module
+ public static final String COLUMN_PIPESERVER_STATUS = "enable";
public static final String COLUMN_PIPESINK_NAME = "name";
public static final String COLUMN_PIPESINK_TYPE = "type";
public static final String COLUMN_PIPESINK_ATTRIBUTES = "attributes";
public static final String COLUMN_PIPE_NAME = "name";
public static final String COLUMN_PIPE_CREATE_TIME = "create time";
- public static final String COLUMN_PIPE2PIPESINK_NAME = "pipeSink";
+ public static final String COLUMN_PIPE_ROLE = "role";
+ public static final String COLUMN_PIPE_REMOTE = "remote";
public static final String COLUMN_PIPE_STATUS = "status";
public static final String COLUMN_PIPE_MSG = "message";
- // sync receiver
- public static final String COLUMN_PIPE_REMOTE_IP = "remote ip";
public static final String ONE_LEVEL_PATH_WILDCARD = "*";
public static final String MULTI_LEVEL_PATH_WILDCARD = "**";
public static final String TIME = "time";
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index f7e03d0..1276b78 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.newsync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
import org.apache.iotdb.db.newsync.transport.server.TransportServerManager;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.apache.iotdb.db.conf.IoTDBConstant.*;
@@ -104,17 +105,9 @@ public class ReceiverService implements IService {
try {
switch (request.getType()) {
case HEARTBEAT:
- List<PipeMessage> messageList =
- receiverManager.getPipeMessages(
- request.getPipeName(), request.getRemoteIp(),
request.getCreateTime());
- PipeMessage message = new PipeMessage(PipeMessage.MsgType.INFO, "");
- if (!messageList.isEmpty()) {
- for (PipeMessage pipeMessage : messageList) {
- if (pipeMessage.getType().getValue() >
message.getType().getValue()) {
- message = pipeMessage;
- }
- }
- }
+ PipeMessage message =
+ receiverManager.getPipeMessage(
+ request.getPipeName(), request.getRemoteIp(),
request.getCreateTime(), true);
switch (message.getType()) {
case INFO:
break;
@@ -191,22 +184,25 @@ public class ReceiverService implements IService {
}
/**
- * query by sql SHOW PIPE
+ * query by sql SHOW PIPESERVER STATUS
*
- * @return QueryDataSet contained three columns: pipe name, status and start
time
+ * @return QueryDataSet contained one column: enable
*/
- public QueryDataSet showPipe(ShowPipeServerPlan plan) throws
PipeServerException {
- if (!receiverManager.isPipeServerEnable()) {
- throw new PipeServerException("Pipe server is not started.");
- }
+ public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
ListDataSet dataSet =
new ListDataSet(
- Arrays.asList(
- new PartialPath(COLUMN_PIPE_NAME, false),
- new PartialPath(COLUMN_PIPE_REMOTE_IP, false),
- new PartialPath(COLUMN_PIPE_STATUS, false),
- new PartialPath(COLUMN_CREATED_TIME, false)),
- Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT,
TSDataType.TEXT));
+ Collections.singletonList(new
PartialPath(COLUMN_PIPESERVER_STATUS, false)),
+ Collections.singletonList(TSDataType.BOOLEAN));
+ RowRecord rowRecord = new RowRecord(0);
+ Field status = new Field(TSDataType.BOOLEAN);
+ status.setBoolV(receiverManager.isPipeServerEnable());
+ rowRecord.addField(status);
+ dataSet.putRecord(rowRecord);
+ return dataSet;
+ }
+
+ /** query by sql SHOW PIPE */
+ public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
List<PipeInfo> pipeInfos;
if (!StringUtils.isEmpty(plan.getPipeName())) {
pipeInfos = receiverManager.getPipeInfos(plan.getPipeName());
@@ -220,21 +216,21 @@ public class ReceiverService implements IService {
}
private void putPipeRecord(ListDataSet dataSet, PipeInfo pipeInfo) {
- RowRecord rowRecord = new RowRecord(0);
- Field pipeNameField = new Field(TSDataType.TEXT);
- Field pipeRemoteIp = new Field(TSDataType.TEXT);
- Field pipeStatusField = new Field(TSDataType.TEXT);
- Field pipeCreateTimeField = new Field(TSDataType.TEXT);
- pipeNameField.setBinaryV(new Binary(pipeInfo.getPipeName()));
- pipeRemoteIp.setBinaryV(new Binary(pipeInfo.getRemoteIp()));
- pipeStatusField.setBinaryV(new Binary(pipeInfo.getStatus().name()));
- pipeCreateTimeField.setBinaryV(
- new Binary(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())));
- rowRecord.addField(pipeNameField);
- rowRecord.addField(pipeRemoteIp);
- rowRecord.addField(pipeStatusField);
- rowRecord.addField(pipeCreateTimeField);
- dataSet.putRecord(rowRecord);
+ RowRecord record = new RowRecord(0);
+ record.addField(
+
Binary.valueOf(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())),
TSDataType.TEXT);
+ record.addField(Binary.valueOf(pipeInfo.getPipeName()), TSDataType.TEXT);
+ record.addField(Binary.valueOf("receiver"), TSDataType.TEXT);
+ record.addField(Binary.valueOf(pipeInfo.getRemoteIp()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(pipeInfo.getStatus().name()),
TSDataType.TEXT);
+ record.addField(
+ Binary.valueOf(
+ receiverManager
+ .getPipeMessage(
+ pipeInfo.getPipeName(), pipeInfo.getRemoteIp(),
pipeInfo.getCreateTime(), false)
+ .getMsg()),
+ TSDataType.TEXT);
+ dataSet.putRecord(record);
}
private ReceiverService() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
index a531e16..6c466bf 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -99,6 +99,14 @@ public class ReceiverManager {
return res;
}
+ /**
+ * write a single message and serialize to disk
+ *
+ * @param pipeName name of pipe
+ * @param remoteIp remoteIp of pipe
+ * @param createTime createTime of pipe
+ * @param message pipe message
+ */
public void writePipeMessage(
String pipeName, String remoteIp, long createTime, PipeMessage message) {
if (pipeInfoMap.containsKey(pipeName) &&
pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
@@ -119,29 +127,68 @@ public class ReceiverManager {
}
}
- public List<PipeMessage> getPipeMessages(String pipeName, String remoteIp,
long createTime) {
+ /**
+ * read recent messages about one pipe
+ *
+ * @param pipeName name of pipe
+ * @param remoteIp remoteIp of pipe
+ * @param createTime createTime of pipe
+ * @param del if del is true, these messages will not be deleted. Otherwise,
these messages can be
+ * read next time.
+ * @return recent messages
+ */
+ public List<PipeMessage> getPipeMessages(
+ String pipeName, String remoteIp, long createTime, boolean del) {
List<PipeMessage> pipeMessageList = new ArrayList<>();
if (pipeInfoMap.containsKey(pipeName) &&
pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
synchronized (pipeInfoMap.get(pipeName).get(remoteIp)) {
String pipeIdentifier =
SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp,
createTime);
- try {
- log.readPipeMsg(pipeIdentifier);
- } catch (IOException e) {
- logger.error(
- "Can not read pipe message about {} from disk because {}",
- pipeIdentifier,
- e.getMessage());
+ if (del) {
+ try {
+ log.readPipeMsg(pipeIdentifier);
+ } catch (IOException e) {
+ logger.error(
+ "Can not read pipe message about {} from disk because {}",
+ pipeIdentifier,
+ e.getMessage());
+ }
}
if (pipeMessageMap.containsKey(pipeIdentifier)) {
pipeMessageList = pipeMessageMap.get(pipeIdentifier);
- pipeMessageMap.remove(pipeIdentifier);
+ if (del) {
+ pipeMessageMap.remove(pipeIdentifier);
+ }
}
}
}
return pipeMessageList;
}
+ /**
+ * read the most important message about one pipe. ERROR > WARN > INFO.
+ *
+ * @param pipeName name of pipe
+ * @param remoteIp remoteIp of pipe
+ * @param createTime createTime of pipe
+ * @param del if del is true, recent messages will not be deleted.
Otherwise, these messages can
+ * be read next time.
+ * @return the most important message
+ */
+ public PipeMessage getPipeMessage(
+ String pipeName, String remoteIp, long createTime, boolean del) {
+ List<PipeMessage> pipeMessageList = getPipeMessages(pipeName, remoteIp,
createTime, del);
+ PipeMessage message = new PipeMessage(PipeMessage.MsgType.INFO, "");
+ if (!pipeMessageList.isEmpty()) {
+ for (PipeMessage pipeMessage : pipeMessageList) {
+ if (pipeMessage.getType().getValue() > message.getType().getValue()) {
+ message = pipeMessage;
+ }
+ }
+ }
+ return message;
+ }
+
public boolean isPipeServerEnable() {
return pipeServerEnable;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 79330ed..7869868 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -216,13 +216,14 @@ import static
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE2PIPESINK_NAME;
import static
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPESINK_ATTRIBUTES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPESINK_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPESINK_TYPE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_CREATE_TIME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_MSG;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_REMOTE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_ROLE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_STATUS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
@@ -753,12 +754,8 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) throws
QueryProcessException {
- try {
- return ReceiverService.getInstance().showPipe(plan);
- } catch (PipeServerException e) {
- throw new QueryProcessException(e);
- }
+ private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
+ return ReceiverService.getInstance().showPipeServer(plan);
}
private QueryDataSet processCountNodes(CountPlan countPlan) throws
MetadataException {
@@ -1215,7 +1212,8 @@ public class PlanExecutor implements IPlanExecutor {
Arrays.asList(
new PartialPath(COLUMN_PIPE_CREATE_TIME, false),
new PartialPath(COLUMN_PIPE_NAME, false),
- new PartialPath(COLUMN_PIPE2PIPESINK_NAME, false),
+ new PartialPath(COLUMN_PIPE_ROLE, false),
+ new PartialPath(COLUMN_PIPE_REMOTE, false),
new PartialPath(COLUMN_PIPE_STATUS, false),
new PartialPath(COLUMN_PIPE_MSG, false)),
Arrays.asList(
@@ -1223,6 +1221,7 @@ public class PlanExecutor implements IPlanExecutor {
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT,
+ TSDataType.TEXT,
TSDataType.TEXT));
boolean showAll = "".equals(plan.getPipeName());
for (Pipe pipe : SenderService.getInstance().getAllPipes())
@@ -1231,12 +1230,14 @@ public class PlanExecutor implements IPlanExecutor {
record.addField(
Binary.valueOf(DatetimeUtils.convertLongToDate(pipe.getCreateTime())),
TSDataType.TEXT);
record.addField(Binary.valueOf(pipe.getName()), TSDataType.TEXT);
+ record.addField(Binary.valueOf("sender"), TSDataType.TEXT);
record.addField(Binary.valueOf(pipe.getPipeSink().getName()),
TSDataType.TEXT);
record.addField(Binary.valueOf(pipe.getStatus().name()),
TSDataType.TEXT);
record.addField(
Binary.valueOf(SenderService.getInstance().getPipeMsg(pipe)),
TSDataType.TEXT);
listDataSet.putRecord(record);
}
+ ReceiverService.getInstance().showPipe(plan, listDataSet);
return listDataSet;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
index 29801d1..310c909 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
@@ -24,17 +24,8 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-import org.apache.commons.lang3.StringUtils;
-
public class ShowPipeServerOperator extends ShowOperator {
- private String pipeName;
-
- public ShowPipeServerOperator(String pipeName, int tokenIntType) {
- this(tokenIntType);
- this.pipeName = pipeName;
- }
-
public ShowPipeServerOperator(int tokenIntType) {
super(tokenIntType);
}
@@ -42,10 +33,6 @@ public class ShowPipeServerOperator extends ShowOperator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- if (StringUtils.isEmpty(pipeName)) {
- return new ShowPipeServerPlan();
- } else {
- return new ShowPipeServerPlan(pipeName);
- }
+ return new ShowPipeServerPlan();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
index 955bdf5..67b664c 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
@@ -20,22 +20,7 @@ package org.apache.iotdb.db.qp.physical.sys;
public class ShowPipeServerPlan extends ShowPlan {
- private String pipeName;
-
- public ShowPipeServerPlan(String pipeName) {
- super(ShowContentType.PIPESERVER);
- this.pipeName = pipeName;
- }
-
public ShowPipeServerPlan() {
super(ShowContentType.PIPESERVER);
}
-
- public String getPipeName() {
- return pipeName;
- }
-
- public void setPipeName(String pipeName) {
- this.pipeName = pipeName;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 7593d4f..df72a4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -2299,12 +2299,7 @@ public class IoTDBSqlVisitor extends
IoTDBSqlParserBaseVisitor<Operator> {
@Override
public Operator visitShowPipeServer(IoTDBSqlParser.ShowPipeServerContext
ctx) {
- if (ctx.pipeName != null) {
- return new ShowPipeServerOperator(
- StringEscapeUtils.unescapeJava(ctx.pipeName.getText()),
SQLConstant.TOK_SHOW_PIPE_SERVER);
- } else {
- return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
- }
+ return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
}
/** 7. Common Clauses */
diff --git
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
index 6a566e4..5be7348 100644
---
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
@@ -80,12 +80,12 @@ public class ReceiverManagerTest {
PipeMessage error = new PipeMessage(PipeMessage.MsgType.ERROR, "error");
manager.writePipeMessage(pipe1, ip1, createdTime1, info);
manager.writePipeMessage(pipe1, ip1, createdTime1, warn);
- List<PipeMessage> messages = manager.getPipeMessages(pipe1, ip1,
createdTime1);
+ List<PipeMessage> messages = manager.getPipeMessages(pipe1, ip1,
createdTime1, true);
Assert.assertEquals(2, messages.size());
Assert.assertEquals(info, messages.get(0));
Assert.assertEquals(warn, messages.get(1));
manager.writePipeMessage(pipe1, ip1, createdTime1, error);
- messages = manager.getPipeMessages(pipe1, ip1, createdTime1);
+ messages = manager.getPipeMessages(pipe1, ip1, createdTime1, true);
Assert.assertEquals(1, messages.size());
Assert.assertEquals(error, messages.get(0));
manager.close();
diff --git
a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index af6a033..f8c2c53 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -1215,16 +1215,10 @@ public class PhysicalPlanTest {
@Test
public void testShowPipeServer() throws QueryProcessException {
- String sql1 = "SHOW PIPESERVER abc";
+ String sql1 = "SHOW PIPESERVER";
ShowPipeServerPlan plan1 = (ShowPipeServerPlan)
processor.parseSQLToPhysicalPlan(sql1);
Assert.assertTrue(plan1.isQuery());
Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER,
plan1.getShowContentType());
- Assert.assertNotNull(plan1.getPipeName());
- String sql2 = "SHOW PIPESERVER";
- ShowPipeServerPlan plan2 = (ShowPipeServerPlan)
processor.parseSQLToPhysicalPlan(sql2);
- Assert.assertTrue(plan2.isQuery());
- Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER,
plan2.getShowContentType());
- Assert.assertNull(plan2.getPipeName());
}
@Test