This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new 786eecb  [To new_sync][IOTDB-2830] SQL: SHOW PIPE and SHOW PIPESERVER 
STATUS (#5387)
786eecb is described below

commit 786eecbd9495a6a5d3f7ccfb1b08894a6ceb7172
Author: Chen YZ <[email protected]>
AuthorDate: Thu Mar 31 21:32:48 2022 +0800

    [To new_sync][IOTDB-2830] SQL: SHOW PIPE and SHOW PIPESERVER STATUS (#5387)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  2 +-
 .../db/integration/sync/IoTDBSyncReceiverIT.java   | 99 +++++++++++-----------
 .../iotdb/db/integration/sync/SyncTestUtil.java    | 21 ++++-
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |  7 +-
 .../iotdb/db/newsync/receiver/ReceiverService.java | 74 ++++++++--------
 .../newsync/receiver/manager/ReceiverManager.java  | 65 ++++++++++++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 17 ++--
 .../db/qp/logical/sys/ShowPipeServerOperator.java  | 15 +---
 .../db/qp/physical/sys/ShowPipeServerPlan.java     | 15 ----
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  7 +-
 .../receiver/manager/ReceiverManagerTest.java      |  4 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  8 +-
 12 files changed, 178 insertions(+), 156 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 7e4cd62..ca15182 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -717,7 +717,7 @@ stopPipeServer
     ;
 
 showPipeServer
-    : SHOW PIPESERVER (pipeName=ID)?
+    : SHOW PIPESERVER
     ;
 
 /**
diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 358ed09..638b3f3 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -32,10 +32,7 @@ import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.newsync.transport.client.TransportClient;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
@@ -46,9 +43,6 @@ import org.apache.iotdb.service.transport.thrift.SyncResponse;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.io.FileUtils;
@@ -153,56 +147,65 @@ public class IoTDBSyncReceiverIT {
   @Test
   public void testPipeOperation() {
     logger.info("testPipeOperation");
+    String[] columnNames = {"create time", "name", "role", "remote", "status", 
"message"};
+    String showPipeSql = "SHOW PIPE";
     try {
       // create
       client.heartbeat(new SyncRequest(RequestType.CREATE, pipeName1, 
remoteIp1, createdTime1));
-      QueryDataSet allDataSet = ReceiverService.getInstance().showPipe(new 
ShowPipeServerPlan(""));
-      RowRecord rowRecord = allDataSet.next();
-      List<Field> fields = rowRecord.getFields();
-      Assert.assertEquals(4, fields.size());
-      Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
-      Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
-      Assert.assertEquals(PipeStatus.STOP.name(), 
fields.get(2).getStringValue());
-      Assert.assertEquals(
-          DatetimeUtils.convertLongToDate(createdTime1), 
fields.get(3).getStringValue());
-      Assert.assertFalse(allDataSet.hasNext());
+      String[] retArray =
+          new String[] {
+            String.format(
+                "%s,%s,%s,%s,%s,%s",
+                DatetimeUtils.convertLongToDate(createdTime1),
+                pipeName1,
+                "receiver",
+                remoteIp1,
+                PipeStatus.STOP.name(),
+                "")
+          };
+      SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
       // start
       client.heartbeat(new SyncRequest(RequestType.START, pipeName1, 
remoteIp1, createdTime1));
-      QueryDataSet pipe1DataSet =
-          ReceiverService.getInstance().showPipe(new 
ShowPipeServerPlan(pipeName1));
-      rowRecord = pipe1DataSet.next();
-      fields = rowRecord.getFields();
-      Assert.assertEquals(4, fields.size());
-      Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
-      Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
-      Assert.assertEquals(PipeStatus.RUNNING.name(), 
fields.get(2).getStringValue());
-      Assert.assertEquals(
-          DatetimeUtils.convertLongToDate(createdTime1), 
fields.get(3).getStringValue());
-      Assert.assertFalse(pipe1DataSet.hasNext());
+      retArray =
+          new String[] {
+            String.format(
+                "%s,%s,%s,%s,%s,%s",
+                DatetimeUtils.convertLongToDate(createdTime1),
+                pipeName1,
+                "receiver",
+                remoteIp1,
+                PipeStatus.RUNNING.name(),
+                "")
+          };
+      SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
       // stop
       client.heartbeat(new SyncRequest(RequestType.STOP, pipeName1, remoteIp1, 
createdTime1));
-      pipe1DataSet = ReceiverService.getInstance().showPipe(new 
ShowPipeServerPlan(pipeName1));
-      rowRecord = pipe1DataSet.next();
-      fields = rowRecord.getFields();
-      Assert.assertEquals(4, fields.size());
-      Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
-      Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
-      Assert.assertEquals(PipeStatus.STOP.name(), 
fields.get(2).getStringValue());
-      Assert.assertEquals(
-          DatetimeUtils.convertLongToDate(createdTime1), 
fields.get(3).getStringValue());
-      Assert.assertFalse(pipe1DataSet.hasNext());
+      retArray =
+          new String[] {
+            String.format(
+                "%s,%s,%s,%s,%s,%s",
+                DatetimeUtils.convertLongToDate(createdTime1),
+                pipeName1,
+                "receiver",
+                remoteIp1,
+                PipeStatus.STOP.name(),
+                "")
+          };
+      SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
       // drop
       client.heartbeat(new SyncRequest(RequestType.DROP, pipeName1, remoteIp1, 
createdTime1));
-      pipe1DataSet = ReceiverService.getInstance().showPipe(new 
ShowPipeServerPlan(pipeName1));
-      rowRecord = pipe1DataSet.next();
-      fields = rowRecord.getFields();
-      Assert.assertEquals(4, fields.size());
-      Assert.assertEquals(pipeName1, fields.get(0).getStringValue());
-      Assert.assertEquals(remoteIp1, fields.get(1).getStringValue());
-      Assert.assertEquals(PipeStatus.DROP.name(), 
fields.get(2).getStringValue());
-      Assert.assertEquals(
-          DatetimeUtils.convertLongToDate(createdTime1), 
fields.get(3).getStringValue());
-      Assert.assertFalse(pipe1DataSet.hasNext());
+      retArray =
+          new String[] {
+            String.format(
+                "%s,%s,%s,%s,%s,%s",
+                DatetimeUtils.convertLongToDate(createdTime1),
+                pipeName1,
+                "receiver",
+                remoteIp1,
+                PipeStatus.DROP.name(),
+                "")
+          };
+      SyncTestUtil.checkResult(showPipeSql, columnNames, retArray, false);
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
index 3c32db1..a17f16f 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
@@ -154,7 +154,8 @@ public class SyncTestUtil {
     }
   }
 
-  public static void checkResult(String sql, String[] columnNames, String[] 
retArray)
+  public static void checkResult(
+      String sql, String[] columnNames, String[] retArray, boolean 
hasTimeColumn)
       throws ClassNotFoundException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     try (Connection connection =
@@ -169,14 +170,21 @@ public class SyncTestUtil {
       for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
         map.put(resultSetMetaData.getColumnName(i), i);
       }
-      assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+      assertEquals(
+          hasTimeColumn ? columnNames.length + 1 : columnNames.length,
+          resultSetMetaData.getColumnCount());
       int cnt = 0;
       while (resultSet.next()) {
         StringBuilder builder = new StringBuilder();
-        builder.append(resultSet.getString(1));
+        if (hasTimeColumn) {
+          builder.append(resultSet.getString(1)).append(",");
+        }
         for (String columnName : columnNames) {
           int index = map.get(columnName);
-          builder.append(",").append(resultSet.getString(index));
+          builder.append(resultSet.getString(index)).append(",");
+        }
+        if (builder.length() > 0) {
+          builder.deleteCharAt(builder.length() - 1);
         }
         assertEquals(retArray[cnt], builder.toString());
         cnt++;
@@ -187,4 +195,9 @@ public class SyncTestUtil {
       fail(e.getMessage());
     }
   }
+
+  public static void checkResult(String sql, String[] columnNames, String[] 
retArray)
+      throws ClassNotFoundException {
+    checkResult(sql, columnNames, retArray, true);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 18dc87f..6b5aa6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -123,17 +123,18 @@ public class IoTDBConstant {
   public static final String COLUMN_TRIGGER_STATUS_STARTED = "started";
   public static final String COLUMN_TRIGGER_STATUS_STOPPED = "stopped";
 
+  // sync module
+  public static final String COLUMN_PIPESERVER_STATUS = "enable";
   public static final String COLUMN_PIPESINK_NAME = "name";
   public static final String COLUMN_PIPESINK_TYPE = "type";
   public static final String COLUMN_PIPESINK_ATTRIBUTES = "attributes";
   public static final String COLUMN_PIPE_NAME = "name";
   public static final String COLUMN_PIPE_CREATE_TIME = "create time";
-  public static final String COLUMN_PIPE2PIPESINK_NAME = "pipeSink";
+  public static final String COLUMN_PIPE_ROLE = "role";
+  public static final String COLUMN_PIPE_REMOTE = "remote";
   public static final String COLUMN_PIPE_STATUS = "status";
   public static final String COLUMN_PIPE_MSG = "message";
 
-  // sync receiver
-  public static final String COLUMN_PIPE_REMOTE_IP = "remote ip";
   public static final String ONE_LEVEL_PATH_WILDCARD = "*";
   public static final String MULTI_LEVEL_PATH_WILDCARD = "**";
   public static final String TIME = "time";
diff --git 
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
 
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index f7e03d0..1276b78 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.newsync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
 import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
 import org.apache.iotdb.db.newsync.transport.server.TransportServerManager;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.*;
@@ -104,17 +105,9 @@ public class ReceiverService implements IService {
     try {
       switch (request.getType()) {
         case HEARTBEAT:
-          List<PipeMessage> messageList =
-              receiverManager.getPipeMessages(
-                  request.getPipeName(), request.getRemoteIp(), 
request.getCreateTime());
-          PipeMessage message = new PipeMessage(PipeMessage.MsgType.INFO, "");
-          if (!messageList.isEmpty()) {
-            for (PipeMessage pipeMessage : messageList) {
-              if (pipeMessage.getType().getValue() > 
message.getType().getValue()) {
-                message = pipeMessage;
-              }
-            }
-          }
+          PipeMessage message =
+              receiverManager.getPipeMessage(
+                  request.getPipeName(), request.getRemoteIp(), 
request.getCreateTime(), true);
           switch (message.getType()) {
             case INFO:
               break;
@@ -191,22 +184,25 @@ public class ReceiverService implements IService {
   }
 
   /**
-   * query by sql SHOW PIPE
+   * query by sql SHOW PIPESERVER STATUS
    *
-   * @return QueryDataSet contained three columns: pipe name, status and start 
time
+   * @return QueryDataSet contained one column: enable
    */
-  public QueryDataSet showPipe(ShowPipeServerPlan plan) throws 
PipeServerException {
-    if (!receiverManager.isPipeServerEnable()) {
-      throw new PipeServerException("Pipe server is not started.");
-    }
+  public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
     ListDataSet dataSet =
         new ListDataSet(
-            Arrays.asList(
-                new PartialPath(COLUMN_PIPE_NAME, false),
-                new PartialPath(COLUMN_PIPE_REMOTE_IP, false),
-                new PartialPath(COLUMN_PIPE_STATUS, false),
-                new PartialPath(COLUMN_CREATED_TIME, false)),
-            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.TEXT));
+            Collections.singletonList(new 
PartialPath(COLUMN_PIPESERVER_STATUS, false)),
+            Collections.singletonList(TSDataType.BOOLEAN));
+    RowRecord rowRecord = new RowRecord(0);
+    Field status = new Field(TSDataType.BOOLEAN);
+    status.setBoolV(receiverManager.isPipeServerEnable());
+    rowRecord.addField(status);
+    dataSet.putRecord(rowRecord);
+    return dataSet;
+  }
+
+  /** query by sql SHOW PIPE */
+  public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
     List<PipeInfo> pipeInfos;
     if (!StringUtils.isEmpty(plan.getPipeName())) {
       pipeInfos = receiverManager.getPipeInfos(plan.getPipeName());
@@ -220,21 +216,21 @@ public class ReceiverService implements IService {
   }
 
   private void putPipeRecord(ListDataSet dataSet, PipeInfo pipeInfo) {
-    RowRecord rowRecord = new RowRecord(0);
-    Field pipeNameField = new Field(TSDataType.TEXT);
-    Field pipeRemoteIp = new Field(TSDataType.TEXT);
-    Field pipeStatusField = new Field(TSDataType.TEXT);
-    Field pipeCreateTimeField = new Field(TSDataType.TEXT);
-    pipeNameField.setBinaryV(new Binary(pipeInfo.getPipeName()));
-    pipeRemoteIp.setBinaryV(new Binary(pipeInfo.getRemoteIp()));
-    pipeStatusField.setBinaryV(new Binary(pipeInfo.getStatus().name()));
-    pipeCreateTimeField.setBinaryV(
-        new Binary(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())));
-    rowRecord.addField(pipeNameField);
-    rowRecord.addField(pipeRemoteIp);
-    rowRecord.addField(pipeStatusField);
-    rowRecord.addField(pipeCreateTimeField);
-    dataSet.putRecord(rowRecord);
+    RowRecord record = new RowRecord(0);
+    record.addField(
+        
Binary.valueOf(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())), 
TSDataType.TEXT);
+    record.addField(Binary.valueOf(pipeInfo.getPipeName()), TSDataType.TEXT);
+    record.addField(Binary.valueOf("receiver"), TSDataType.TEXT);
+    record.addField(Binary.valueOf(pipeInfo.getRemoteIp()), TSDataType.TEXT);
+    record.addField(Binary.valueOf(pipeInfo.getStatus().name()), 
TSDataType.TEXT);
+    record.addField(
+        Binary.valueOf(
+            receiverManager
+                .getPipeMessage(
+                    pipeInfo.getPipeName(), pipeInfo.getRemoteIp(), 
pipeInfo.getCreateTime(), false)
+                .getMsg()),
+        TSDataType.TEXT);
+    dataSet.putRecord(record);
   }
 
   private ReceiverService() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
 
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
index a531e16..6c466bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -99,6 +99,14 @@ public class ReceiverManager {
     return res;
   }
 
+  /**
+   * write a single message and serialize to disk
+   *
+   * @param pipeName name of pipe
+   * @param remoteIp remoteIp of pipe
+   * @param createTime createTime of pipe
+   * @param message pipe message
+   */
   public void writePipeMessage(
       String pipeName, String remoteIp, long createTime, PipeMessage message) {
     if (pipeInfoMap.containsKey(pipeName) && 
pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
@@ -119,29 +127,68 @@ public class ReceiverManager {
     }
   }
 
-  public List<PipeMessage> getPipeMessages(String pipeName, String remoteIp, 
long createTime) {
+  /**
+   * read recent messages about one pipe
+   *
+   * @param pipeName name of pipe
+   * @param remoteIp remoteIp of pipe
+   * @param createTime createTime of pipe
+   * @param del if del is true, these messages will not be deleted. Otherwise, 
these messages can be
+   *     read next time.
+   * @return recent messages
+   */
+  public List<PipeMessage> getPipeMessages(
+      String pipeName, String remoteIp, long createTime, boolean del) {
     List<PipeMessage> pipeMessageList = new ArrayList<>();
     if (pipeInfoMap.containsKey(pipeName) && 
pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
       synchronized (pipeInfoMap.get(pipeName).get(remoteIp)) {
         String pipeIdentifier =
             SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, 
createTime);
-        try {
-          log.readPipeMsg(pipeIdentifier);
-        } catch (IOException e) {
-          logger.error(
-              "Can not read pipe message about {} from disk because {}",
-              pipeIdentifier,
-              e.getMessage());
+        if (del) {
+          try {
+            log.readPipeMsg(pipeIdentifier);
+          } catch (IOException e) {
+            logger.error(
+                "Can not read pipe message about {} from disk because {}",
+                pipeIdentifier,
+                e.getMessage());
+          }
         }
         if (pipeMessageMap.containsKey(pipeIdentifier)) {
           pipeMessageList = pipeMessageMap.get(pipeIdentifier);
-          pipeMessageMap.remove(pipeIdentifier);
+          if (del) {
+            pipeMessageMap.remove(pipeIdentifier);
+          }
         }
       }
     }
     return pipeMessageList;
   }
 
+  /**
+   * read the most important message about one pipe. ERROR > WARN > INFO.
+   *
+   * @param pipeName name of pipe
+   * @param remoteIp remoteIp of pipe
+   * @param createTime createTime of pipe
+   * @param del if del is true, recent messages will not be deleted. 
Otherwise, these messages can
+   *     be read next time.
+   * @return the most important message
+   */
+  public PipeMessage getPipeMessage(
+      String pipeName, String remoteIp, long createTime, boolean del) {
+    List<PipeMessage> pipeMessageList = getPipeMessages(pipeName, remoteIp, 
createTime, del);
+    PipeMessage message = new PipeMessage(PipeMessage.MsgType.INFO, "");
+    if (!pipeMessageList.isEmpty()) {
+      for (PipeMessage pipeMessage : pipeMessageList) {
+        if (pipeMessage.getType().getValue() > message.getType().getValue()) {
+          message = pipeMessage;
+        }
+      }
+    }
+    return message;
+  }
+
   public boolean isPipeServerEnable() {
     return pipeServerEnable;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 79330ed..7869868 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -216,13 +216,14 @@ import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE2PIPESINK_NAME;
 import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPESINK_ATTRIBUTES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPESINK_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPESINK_TYPE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_CREATE_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_MSG;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_REMOTE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PIPE_STATUS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
@@ -753,12 +754,8 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) throws 
QueryProcessException {
-    try {
-      return ReceiverService.getInstance().showPipe(plan);
-    } catch (PipeServerException e) {
-      throw new QueryProcessException(e);
-    }
+  private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
+    return ReceiverService.getInstance().showPipeServer(plan);
   }
 
   private QueryDataSet processCountNodes(CountPlan countPlan) throws 
MetadataException {
@@ -1215,7 +1212,8 @@ public class PlanExecutor implements IPlanExecutor {
             Arrays.asList(
                 new PartialPath(COLUMN_PIPE_CREATE_TIME, false),
                 new PartialPath(COLUMN_PIPE_NAME, false),
-                new PartialPath(COLUMN_PIPE2PIPESINK_NAME, false),
+                new PartialPath(COLUMN_PIPE_ROLE, false),
+                new PartialPath(COLUMN_PIPE_REMOTE, false),
                 new PartialPath(COLUMN_PIPE_STATUS, false),
                 new PartialPath(COLUMN_PIPE_MSG, false)),
             Arrays.asList(
@@ -1223,6 +1221,7 @@ public class PlanExecutor implements IPlanExecutor {
                 TSDataType.TEXT,
                 TSDataType.TEXT,
                 TSDataType.TEXT,
+                TSDataType.TEXT,
                 TSDataType.TEXT));
     boolean showAll = "".equals(plan.getPipeName());
     for (Pipe pipe : SenderService.getInstance().getAllPipes())
@@ -1231,12 +1230,14 @@ public class PlanExecutor implements IPlanExecutor {
         record.addField(
             
Binary.valueOf(DatetimeUtils.convertLongToDate(pipe.getCreateTime())), 
TSDataType.TEXT);
         record.addField(Binary.valueOf(pipe.getName()), TSDataType.TEXT);
+        record.addField(Binary.valueOf("sender"), TSDataType.TEXT);
         record.addField(Binary.valueOf(pipe.getPipeSink().getName()), 
TSDataType.TEXT);
         record.addField(Binary.valueOf(pipe.getStatus().name()), 
TSDataType.TEXT);
         record.addField(
             Binary.valueOf(SenderService.getInstance().getPipeMsg(pipe)), 
TSDataType.TEXT);
         listDataSet.putRecord(record);
       }
+    ReceiverService.getInstance().showPipe(plan, listDataSet);
     return listDataSet;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
index 29801d1..310c909 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
@@ -24,17 +24,8 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
-import org.apache.commons.lang3.StringUtils;
-
 public class ShowPipeServerOperator extends ShowOperator {
 
-  private String pipeName;
-
-  public ShowPipeServerOperator(String pipeName, int tokenIntType) {
-    this(tokenIntType);
-    this.pipeName = pipeName;
-  }
-
   public ShowPipeServerOperator(int tokenIntType) {
     super(tokenIntType);
   }
@@ -42,10 +33,6 @@ public class ShowPipeServerOperator extends ShowOperator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    if (StringUtils.isEmpty(pipeName)) {
-      return new ShowPipeServerPlan();
-    } else {
-      return new ShowPipeServerPlan(pipeName);
-    }
+    return new ShowPipeServerPlan();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
index 955bdf5..67b664c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
@@ -20,22 +20,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 public class ShowPipeServerPlan extends ShowPlan {
 
-  private String pipeName;
-
-  public ShowPipeServerPlan(String pipeName) {
-    super(ShowContentType.PIPESERVER);
-    this.pipeName = pipeName;
-  }
-
   public ShowPipeServerPlan() {
     super(ShowContentType.PIPESERVER);
   }
-
-  public String getPipeName() {
-    return pipeName;
-  }
-
-  public void setPipeName(String pipeName) {
-    this.pipeName = pipeName;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 7593d4f..df72a4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -2299,12 +2299,7 @@ public class IoTDBSqlVisitor extends 
IoTDBSqlParserBaseVisitor<Operator> {
 
   @Override
   public Operator visitShowPipeServer(IoTDBSqlParser.ShowPipeServerContext 
ctx) {
-    if (ctx.pipeName != null) {
-      return new ShowPipeServerOperator(
-          StringEscapeUtils.unescapeJava(ctx.pipeName.getText()), 
SQLConstant.TOK_SHOW_PIPE_SERVER);
-    } else {
-      return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
-    }
+    return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
   }
 
   /** 7. Common Clauses */
diff --git 
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
index 6a566e4..5be7348 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
@@ -80,12 +80,12 @@ public class ReceiverManagerTest {
       PipeMessage error = new PipeMessage(PipeMessage.MsgType.ERROR, "error");
       manager.writePipeMessage(pipe1, ip1, createdTime1, info);
       manager.writePipeMessage(pipe1, ip1, createdTime1, warn);
-      List<PipeMessage> messages = manager.getPipeMessages(pipe1, ip1, 
createdTime1);
+      List<PipeMessage> messages = manager.getPipeMessages(pipe1, ip1, 
createdTime1, true);
       Assert.assertEquals(2, messages.size());
       Assert.assertEquals(info, messages.get(0));
       Assert.assertEquals(warn, messages.get(1));
       manager.writePipeMessage(pipe1, ip1, createdTime1, error);
-      messages = manager.getPipeMessages(pipe1, ip1, createdTime1);
+      messages = manager.getPipeMessages(pipe1, ip1, createdTime1, true);
       Assert.assertEquals(1, messages.size());
       Assert.assertEquals(error, messages.get(0));
       manager.close();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java 
b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index af6a033..f8c2c53 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -1215,16 +1215,10 @@ public class PhysicalPlanTest {
 
   @Test
   public void testShowPipeServer() throws QueryProcessException {
-    String sql1 = "SHOW PIPESERVER abc";
+    String sql1 = "SHOW PIPESERVER";
     ShowPipeServerPlan plan1 = (ShowPipeServerPlan) 
processor.parseSQLToPhysicalPlan(sql1);
     Assert.assertTrue(plan1.isQuery());
     Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, 
plan1.getShowContentType());
-    Assert.assertNotNull(plan1.getPipeName());
-    String sql2 = "SHOW PIPESERVER";
-    ShowPipeServerPlan plan2 = (ShowPipeServerPlan) 
processor.parseSQLToPhysicalPlan(sql2);
-    Assert.assertTrue(plan2.isQuery());
-    Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, 
plan2.getShowContentType());
-    Assert.assertNull(plan2.getPipeName());
   }
 
   @Test

Reply via email to