This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d12a9b33940 Pipe: Implemented better permission check for showing pipe
(#16403)
d12a9b33940 is described below
commit d12a9b33940696659270cb929398f4b8e158d16a
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 15 14:08:32 2025 +0800
Pipe: Implemented better permission check for showing pipe (#16403)
* logger
* fix
* fix
* fix-IT
* continue-fix
* fix
* temporary
* Add-IT
* Fix
* fix
* add-test
* fix
* fix
* fix
* fix
* fix-logger-show
* fix
---
.../tablemodel/manual/basic/IoTDBPipeAlterIT.java | 41 +++++++++---
.../manual/basic/IoTDBPipeLifeCycleIT.java | 15 -----
.../manual/basic/IoTDBPipePermissionIT.java | 59 +++++++++++++----
.../manual/basic/IoTDBPipeSwitchStatusIT.java | 55 +++++++++++-----
.../tablemodel/manual/basic/IoTDBPipeSyntaxIT.java | 47 ++++++++++---
.../manual/enhanced/IoTDBPipeClusterIT.java | 10 ++-
.../manual/enhanced/IoTDBPipeDoubleLivingIT.java | 5 +-
.../enhanced/IoTDBPipeSinkCompressionIT.java | 4 +-
.../treemodel/auto/basic/IoTDBPipeAlterIT.java | 34 ++++++----
.../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 4 +-
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 4 +-
.../auto/basic/IoTDBPipeSwitchStatusIT.java | 55 +++++++++++-----
.../treemodel/auto/basic/IoTDBPipeSyntaxIT.java | 25 ++++---
.../auto/enhanced/IoTDBPipeClusterIT.java | 13 ++--
.../enhanced/IoTDBPipeConditionalOperationsIT.java | 16 +++--
.../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 4 +-
.../treemodel/auto/enhanced/PipeNowFunctionIT.java | 13 ++--
.../pipe/it/single/IoTDBPipePermissionIT.java | 30 ++++++++-
.../relational/it/schema/IoTDBDatabaseIT.java | 10 ++-
.../response/pipe/task/PipeTableResp.java | 76 ++++++++++++++++++++--
.../pipe/coordinator/task/PipeTaskCoordinator.java | 2 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 42 ++++++------
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../schemachange/RenameDatabaseProcessor.java | 2 +-
.../InformationSchemaContentSupplierFactory.java | 5 +-
.../execution/config/TableConfigTaskVisitor.java | 5 +-
.../config/executor/ClusterConfigTaskExecutor.java | 6 +-
.../config/executor/IConfigTaskExecutor.java | 3 +-
.../execution/config/sys/pipe/ShowPipeTask.java | 6 +-
.../schemaregion/utils/MetaFormatUtils.java | 2 +-
.../commons/auth/authorizer/BasicAuthorizer.java | 3 -
.../iotdb/commons/auth/authorizer/IAuthorizer.java | 2 +
.../auth/authorizer/LocalFileAuthorizer.java | 2 +-
.../src/main/thrift/confignode.thrift | 1 +
34 files changed, 433 insertions(+), 170 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
index d85383655a9..640b9bbefbe 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
@@ -76,7 +77,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -109,7 +112,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -130,7 +135,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -167,7 +174,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -203,7 +212,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -247,7 +258,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -281,7 +294,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -315,7 +330,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -351,7 +368,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -384,7 +403,9 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeLifeCycleIT.java
index bd253933c5c..e4eeec965d4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeLifeCycleIT.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static
org.apache.iotdb.db.it.utils.TestUtils.assertTableNonQueryTestFail;
-import static org.apache.iotdb.db.it.utils.TestUtils.assertTableTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
@@ -741,13 +740,6 @@ public class IoTDBPipeLifeCycleIT extends
AbstractPipeTableModelDualManualIT {
"test",
"test123123456",
null);
- assertTableTestFail(
- senderEnv,
- "show pipes",
- "803: Access Denied: No permissions for this operation, only root user
is allowed",
- "test",
- "test123123456",
- null);
assertTableNonQueryTestFail(
senderEnv,
"start pipe testPipe",
@@ -777,12 +769,5 @@ public class IoTDBPipeLifeCycleIT extends
AbstractPipeTableModelDualManualIT {
"test",
"test123123456",
null);
- assertTableTestFail(
- senderEnv,
- "show pipe plugins",
- "803: Access Denied: No permissions for this operation, only root user
is allowed",
- "test",
- "test123123456",
- null);
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 803880202d1..fc803356f07 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
@@ -95,7 +96,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
}
@Test
- public void testSourcePermission() {
+ public void testSourcePermission() throws Exception {
if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "create user `thulab` 'passwD@123456'", null)) {
return;
@@ -244,6 +245,36 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
});
+
+ // test showing pipe
+ // Create another pipe, user is root
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2c"
+ + " with source ("
+ + "'inclusion'='all',"
+ + "'capture.tree'='true',"
+ + "'capture.table'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail("Create pipe without user shall succeed if use the current
session");
+ }
+
+ // A user shall only see its own pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Assert.assertEquals(
+ 1,
+ client
+ .showPipe(new
TShowPipeReq().setIsTableModel(true).setUserName("thulab"))
+ .pipeInfoList
+ .size());
+ }
}
@Test
@@ -260,28 +291,28 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
return;
}
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
final String dbName = "test";
final String tbName = "test";
- extractorAttributes.put("extractor.inclusion", "all");
- extractorAttributes.put("extractor.capture.tree", "false");
- extractorAttributes.put("extractor.capture.table", "true");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("extractor.inclusion", "all");
+ sourceAttributes.put("extractor.capture.tree", "false");
+ sourceAttributes.put("extractor.capture.table", "true");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- connectorAttributes.put("connector.user", "testUser");
- connectorAttributes.put("connector.password", "passwD@123456");
+ sinkAttributes.put("connector", "iotdb-thrift-connector");
+ sinkAttributes.put("connector.ip", receiverIp);
+ sinkAttributes.put("connector.port", Integer.toString(receiverPort));
+ sinkAttributes.put("connector.user", "testUser");
+ sinkAttributes.put("connector.password", "passwD@123456");
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSwitchStatusIT.java
index 599325593c6..fc96eb05aa7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSwitchStatusIT.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
@@ -102,7 +103,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -118,7 +120,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p3").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -131,7 +134,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p2").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -140,7 +144,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
}
@@ -182,7 +187,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -194,20 +200,23 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
status.getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.stream().filter((o) ->
o.id.equals("p1")).count());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -215,7 +224,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -267,14 +277,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(
0,
@@ -288,7 +300,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.stream().filter((o) ->
o.id.equals("p1")).count());
}
@@ -337,14 +350,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("p").getCode());
Assert.assertEquals(
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("*").getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -357,14 +372,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("p").getCode());
Assert.assertEquals(
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("*").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -377,14 +394,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelDualManualIT
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.dropPipe("p").getCode());
Assert.assertEquals(
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.dropPipe("*").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSyntaxIT.java
index 6c0d09aac70..aa039af957b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSyntaxIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
@@ -99,7 +100,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
for (final String pipeName : expectedPipeNames) {
Assert.assertTrue(
@@ -117,7 +120,10 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
}
- showPipeResult = client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ showPipeResult =
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -190,7 +196,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -264,7 +272,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(2, showPipeResult.size());
}
@@ -332,7 +342,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
}
@@ -541,7 +553,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(2, showPipeResult.size());
}
@@ -601,12 +615,19 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(3, showPipeResult.size());
showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true).setPipeName("p1")).pipeInfoList;
+ client.showPipe(
+ new TShowPipeReq()
+ .setIsTableModel(true)
+ .setPipeName("p1")
+ .setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p2")));
@@ -616,7 +637,11 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
// p1 and p2 share the same connector parameters, so they have the same
connector.
showPipeResult =
client.showPipe(
- new
TShowPipeReq().setIsTableModel(true).setPipeName("p1").setWhereClause(true))
+ new TShowPipeReq()
+ .setIsTableModel(true)
+ .setPipeName("p1")
+ .setWhereClause(true)
+ .setUserName(SessionConfig.DEFAULT_USER))
.pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
@@ -708,7 +733,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelDualManualIT {
}
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 12f2857c7c1..b23284682eb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -592,7 +593,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(30, showPipeResult.size());
}
@@ -907,7 +909,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
Assert.assertTrue(successCount.get() >= 1);
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -985,7 +988,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(successCount.get(), showPipeResult.size());
showPipeResult =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java
index 93d9697e941..9d29d13a16c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
@@ -110,7 +111,9 @@ public class IoTDBPipeDoubleLivingIT extends
AbstractPipeTableModelDualManualIT
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index e91b156e09c..31935550b94 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -336,7 +337,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeTableModelDualManual
Assert.assertTrue(e.getMessage().contains("Zstd compression level
should be in the range"));
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(
3,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
index 5e2291f3760..aff44028d91 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
@@ -75,7 +76,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
long lastCreationTime;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -107,7 +109,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -125,7 +128,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -163,7 +167,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -199,7 +204,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -242,7 +248,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -279,7 +286,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -316,7 +324,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -353,7 +362,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -390,7 +400,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// check status
@@ -427,7 +438,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 4cc50e97323..31308f1c0e1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -93,7 +94,8 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(2, showPipeResult.size());
Assert.assertTrue(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 6f95d3d1f6c..1a75ffd2f10 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -1076,7 +1077,8 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
private void assertPipeCount(int count) throws Exception {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(count, showPipeResult.size());
// for (TShowPipeInfo showPipeInfo : showPipeResult) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSwitchStatusIT.java
index 64f6cb30c53..9ab3eb92b3c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSwitchStatusIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
@@ -91,7 +92,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -107,7 +109,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p3").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -120,7 +123,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p2").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -129,7 +133,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
}
@@ -160,7 +165,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -172,20 +178,23 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
status.getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.stream().filter((o) ->
o.id.equals("p1")).count());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -193,7 +202,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -232,7 +242,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -243,7 +254,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(
0,
@@ -257,7 +269,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.stream().filter((o) ->
o.id.equals("p1")).count());
}
@@ -295,14 +308,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("p").getCode());
Assert.assertEquals(
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("*").getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -315,14 +330,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("p").getCode());
Assert.assertEquals(
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("*").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
@@ -335,14 +352,16 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualTreeModelAutoIT {
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.dropPipe("p").getCode());
Assert.assertEquals(
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.dropPipe("*").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
index 37b4ad5c73e..e925e9e4428 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
@@ -88,7 +89,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
}
}
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
for (final String pipeName : expectedPipeNames) {
Assert.assertTrue(
@@ -106,7 +108,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
}
}
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -166,7 +169,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
} catch (SQLException ignored) {
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -239,7 +243,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
fail(e.getMessage());
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(2, showPipeResult.size());
}
@@ -323,7 +328,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
} catch (SQLException ignored) {
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
}
@@ -582,7 +588,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
} catch (SQLException ignored) {
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(9, showPipeResult.size());
}
@@ -629,7 +636,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(3, showPipeResult.size());
@@ -732,7 +740,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
fail(e.getMessage());
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index 2656b38f046..b6569da39b0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -576,7 +577,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(30, showPipeResult.size());
}
@@ -808,7 +810,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
"count(root.db.d1.s1),",
Collections.singleton(succeedNum + ","));
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
}
@@ -957,7 +960,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertTrue(successCount.get() >= 1);
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -1030,7 +1034,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(successCount.get(), showPipeResult.size());
showPipeResult =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeConditionalOperationsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeConditionalOperationsIT.java
index 7361f72db2d..7d0e7f71c0a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeConditionalOperationsIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeConditionalOperationsIT.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced;
@@ -70,7 +71,8 @@ public class IoTDBPipeConditionalOperationsIT extends
AbstractPipeDualTreeModelA
long creationTime;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -106,7 +108,8 @@ public class IoTDBPipeConditionalOperationsIT extends
AbstractPipeDualTreeModelA
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
@@ -155,7 +158,8 @@ public class IoTDBPipeConditionalOperationsIT extends
AbstractPipeDualTreeModelA
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -187,7 +191,8 @@ public class IoTDBPipeConditionalOperationsIT extends
AbstractPipeDualTreeModelA
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(0, showPipeResult.size());
}
@@ -219,7 +224,8 @@ public class IoTDBPipeConditionalOperationsIT extends
AbstractPipeDualTreeModelA
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(1, showPipeResult.size());
// Check status
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 40d8c31725f..b87b2154015 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -325,7 +326,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
Assert.assertTrue(e.getMessage().contains("Zstd compression level
should be in the range"));
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(
3,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/PipeNowFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/PipeNowFunctionIT.java
index 0b6e56cd466..cd3009dd9f8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/PipeNowFunctionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/PipeNowFunctionIT.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced;
@@ -116,7 +117,8 @@ public class PipeNowFunctionIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -124,7 +126,8 @@ public class PipeNowFunctionIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -144,7 +147,8 @@ public class PipeNowFunctionIT extends
AbstractPipeDualTreeModelAutoIT {
.setConnectorAttributes(new HashMap<>())
.setIsReplaceAllConnectorAttributes(false));
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -167,7 +171,8 @@ public class PipeNowFunctionIT extends
AbstractPipeDualTreeModelAutoIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
index 7084f4ffd49..8580dd84707 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
@@ -19,12 +19,15 @@
package org.apache.iotdb.pipe.it.single;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -40,7 +43,7 @@ import static org.junit.Assert.fail;
@Category({MultiClusterIT1.class})
public class IoTDBPipePermissionIT extends AbstractPipeSingleIT {
@Test
- public void testSinkPermission() {
+ public void testSinkPermission() throws Exception {
if (!TestUtils.tryExecuteNonQueryWithRetry(env, "create user `thulab`
'passwd'", null)) {
return;
}
@@ -159,6 +162,31 @@ public class IoTDBPipePermissionIT extends
AbstractPipeSingleIT {
}
TableModelUtils.assertCountData("test", "test", 100, env);
+
+ // test showing pipe
+ // Create another pipe, user is root
+ try (final Connection connection =
env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ "create pipe a2c "
+ + "with source ('database'='test1') "
+ + "with processor('processor'='rename-database-processor',
'processor.new-db-name'='test') "
+ + "with sink ('sink'='write-back-sink')");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail("Create pipe without user shall succeed if use the current
session");
+ }
+
+ // A user shall only see its own pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
+ Assert.assertEquals(
+ 1,
+ client
+ .showPipe(new
TShowPipeReq().setIsTableModel(true).setUserName("thulab"))
+ .pipeInfoList
+ .size());
+ }
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index a9736613e72..d686b8658b6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -353,6 +353,7 @@ public class IoTDBDatabaseIT {
try (final Connection adminCon =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement adminStmt = adminCon.createStatement()) {
adminStmt.execute("create user test 'password123456'");
+ adminStmt.execute("create pipe test ('sink'='do-nothing-sink')");
}
try (final Connection connection =
@@ -561,7 +562,6 @@ public class IoTDBDatabaseIT {
// Only root user is allowed
Assert.assertThrows(SQLException.class, () -> statement.execute("select
* from regions"));
- Assert.assertThrows(SQLException.class, () -> statement.execute("select
* from pipes"));
Assert.assertThrows(SQLException.class, () -> statement.execute("select
* from topics"));
Assert.assertThrows(
SQLException.class, () -> statement.execute("select * from
subscriptions"));
@@ -572,6 +572,12 @@ public class IoTDBDatabaseIT {
SQLException.class, () -> statement.execute("select * from
config_nodes"));
Assert.assertThrows(SQLException.class, () -> statement.execute("select
* from data_nodes"));
+ // Filter out not self-created pipes
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from pipes"),
+
"id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,",
+ Collections.emptySet());
+
// No auth needed
TestUtils.assertResultSetEqual(
statement.executeQuery(
@@ -671,7 +677,7 @@ public class IoTDBDatabaseIT {
TestUtils.assertResultSetEqual(
statement.executeQuery("select id from pipes where creation_time >
0"),
"id,",
- Collections.singleton("a2b,"));
+ new HashSet<>(Arrays.asList("a2b,", "test,")));
TestUtils.assertResultSetEqual(
statement.executeQuery(
"select * from pipe_plugins where plugin_name =
'IOTDB-THRIFT-SINK'"),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 34cbcdf7bdf..af5f803364c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -20,13 +20,17 @@
package org.apache.iotdb.confignode.consensus.response.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.confignode.manager.pipe.source.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -35,10 +39,12 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -75,7 +81,7 @@ public class PipeTableResp implements DataSet {
.filter(pipeMeta ->
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.collect(Collectors.toList()));
} else {
- final String sortedConnectorParametersString =
+ final String sortedSinkParametersString =
allPipeMeta.stream()
.filter(pipeMeta ->
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.findFirst()
@@ -91,18 +97,80 @@ public class PipeTableResp implements DataSet {
.getStaticMeta()
.getSinkParameters()
.toString()
- .equals(sortedConnectorParametersString))
+ .equals(sortedSinkParametersString))
.collect(Collectors.toList()));
}
}
public PipeTableResp filter(
- final Boolean whereClause, final String pipeName, final boolean
isTableModel) {
+ final Boolean whereClause,
+ final String pipeName,
+ final boolean isTableModel,
+ final String userName) {
final PipeTableResp resp = filter(whereClause, pipeName);
- resp.allPipeMeta.removeIf(meta ->
!meta.getStaticMeta().visibleUnder(isTableModel));
+ resp.allPipeMeta.removeIf(
+ meta ->
+ !meta.getStaticMeta().visibleUnder(isTableModel)
+ || !isVisible4User(userName, meta.getStaticMeta()));
return resp;
}
+ public boolean isVisible4User(final String userName, final PipeStaticMeta
meta) {
+ try {
+ return Objects.isNull(userName)
+ || BasicAuthorizer.getInstance().isAdmin(userName)
+ || isVisible4SourceUser(userName, meta.getSourceParameters())
+ || isVisible4SinkUser(userName, meta.getSinkParameters());
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ private boolean isVisible4SourceUser(
+ final String userName, final PipeParameters sourceParameters) {
+ final String pluginName =
+ sourceParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
+
+ if
(!pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ &&
!pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+ return false;
+ }
+
+ return Objects.equals(
+ userName,
+ sourceParameters.getStringByKeys(
+ PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+ }
+
+ private boolean isVisible4SinkUser(final String userName, final
PipeParameters sinkParameters) {
+ final String pluginName =
+ sinkParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
+ BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+ .toLowerCase();
+
+ if
(!pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+ &&
!pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) {
+ return false;
+ }
+
+ return Objects.equals(
+ userName,
+ sinkParameters.getStringByKeys(
+ PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+ }
+
public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException
{
final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
for (final PipeMeta pipeMeta : allPipeMeta) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 4aaf3ab46c3..c68c6ef6fa5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -241,7 +241,7 @@ public class PipeTaskCoordinator {
public TShowPipeResp showPipes(final TShowPipeReq req) {
try {
return ((PipeTableResp) configManager.getConsensusManager().read(new
ShowPipePlanV2()))
- .filter(req.whereClause, req.pipeName, req.isTableModel)
+ .filter(req.whereClause, req.pipeName, req.isTableModel,
req.userName)
.convertToTShowPipeResp();
} catch (final ConsensusException e) {
LOGGER.warn("Failed in the read API executing the consensus layer due
to: ", e);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 2fd698112fc..1ce62dc89d1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -153,14 +153,14 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
public static void checkAndEnrichSourceAuthentication(
- final ConfigNodeProcedureEnv env, final Map<String, String>
extractorAttributes) {
- if (Objects.isNull(extractorAttributes)) {
+ final ConfigNodeProcedureEnv env, final Map<String, String>
sourceAttributes) {
+ if (Objects.isNull(sourceAttributes)) {
return;
}
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
final String pluginName =
- extractorParameters
+ sourceParameters
.getStringOrDefault(
Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
@@ -171,26 +171,26 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
return;
}
- if
(extractorParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY)
- ||
extractorParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY)
- ||
extractorParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY)
- ||
extractorParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY))
{
+ if
(sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)) {
final String hashedPassword =
env.getConfigManager()
.getPermissionManager()
.login4Pipe(
- extractorParameters.getStringByKeys(
+ sourceParameters.getStringByKeys(
PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY),
- extractorParameters.getStringByKeys(
+ sourceParameters.getStringByKeys(
PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
if (Objects.isNull(hashedPassword)) {
throw new PipeException("Authentication failed.");
}
- extractorParameters.addOrReplaceEquivalentAttributes(
+ sourceParameters.addOrReplaceEquivalentAttributes(
new PipeParameters(
Collections.singletonMap(
PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
hashedPassword)));
@@ -198,11 +198,11 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
public static void checkAndEnrichSinkAuthentication(
- final ConfigNodeProcedureEnv env, final Map<String, String>
connectorAttributes) {
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
+ final ConfigNodeProcedureEnv env, final Map<String, String>
sinkAttributes) {
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
final String pluginName =
- connectorParameters
+ sinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
@@ -213,26 +213,26 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
return;
}
- if
(connectorParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY)
- ||
connectorParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USER_KEY)
- ||
connectorParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY)
- ||
connectorParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)) {
+ if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY)
+ || sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USER_KEY)
+ ||
sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY)
+ ||
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)) {
final String hashedPassword =
env.getConfigManager()
.getPermissionManager()
.login4Pipe(
- connectorParameters.getStringByKeys(
+ sinkParameters.getStringByKeys(
PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
PipeSinkConstant.SINK_IOTDB_USER_KEY,
PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
PipeSinkConstant.SINK_IOTDB_USERNAME_KEY),
- connectorParameters.getStringByKeys(
+ sinkParameters.getStringByKeys(
PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
if (Objects.isNull(hashedPassword)) {
throw new PipeException("Authentication failed.");
}
- connectorParameters.addOrReplaceEquivalentAttributes(
+ sinkParameters.addOrReplaceEquivalentAttributes(
new PipeParameters(
Collections.singletonMap(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY,
hashedPassword)));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 55c5410a555..f55414712a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -89,7 +89,7 @@ public class IoTDBConfig {
private static final String DEFAULT_MULTI_DIR_STRATEGY = "SequenceStrategy";
private static final String STORAGE_GROUP_MATCHER =
"([a-zA-Z0-9`_.\\-\\u2E80-\\u9FFF]+)";
- public static final Pattern STORAGE_GROUP_PATTERN =
Pattern.compile(STORAGE_GROUP_MATCHER);
+ public static final Pattern DATABASE_PATTERN =
Pattern.compile(STORAGE_GROUP_MATCHER);
// e.g., a31+/$%#&[]{}3e4, "a.b", 'a.b'
private static final String NODE_NAME_MATCHER = "([^\n\t]+)";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
index a1664c3e2c5..b1757d5087c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
@@ -58,7 +58,7 @@ public class RenameDatabaseProcessor implements PipeProcessor
{
+ "should match the pattern %s, and the length should not
exceed %d",
newDatabaseName,
PATH_SEPARATOR,
- IoTDBConfig.STORAGE_GROUP_PATTERN,
+ IoTDBConfig.DATABASE_PATTERN,
MAX_DATABASE_NAME_LENGTH),
e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index e813a8d494a..3ba3668e990 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -540,11 +540,12 @@ public class InformationSchemaContentSupplierFactory {
private PipeSupplier(final List<TSDataType> dataTypes, final String
userName) throws Exception {
super(dataTypes);
- accessControl.checkUserIsAdmin(userName);
try (final ConfigNodeClient client =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
iterator =
- client.showPipe(new
TShowPipeReq().setIsTableModel(true)).getPipeInfoListIterator();
+ client
+ .showPipe(new
TShowPipeReq().setIsTableModel(true).setUserName(userName))
+ .getPipeInfoListIterator();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 4802929f318..b796cb1e81e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -765,7 +765,7 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
public static void validateDatabaseName(final String dbName) throws
SemanticException {
// Check database length here
if (dbName.contains(PATH_SEPARATOR)
- || !IoTDBConfig.STORAGE_GROUP_PATTERN.matcher(dbName).matches()
+ || !IoTDBConfig.DATABASE_PATTERN.matcher(dbName).matches()
|| dbName.length() > MAX_DATABASE_NAME_LENGTH) {
throw new SemanticException(
new IllegalPathException(
@@ -1186,8 +1186,7 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
@Override
protected IConfigTask visitShowPipes(ShowPipes node, MPPQueryContext
context) {
context.setQueryType(QueryType.READ);
- accessControl.checkUserIsAdmin(context.getSession().getUserName());
- return new ShowPipeTask(node);
+ return new ShowPipeTask(node, context.getSession().getUserName());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index cd5ddd30543..4df8a6a4f71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2494,7 +2494,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> showPipes(final ShowPipesStatement
showPipesStatement) {
+ public SettableFuture<ConfigTaskResult> showPipes(
+ final ShowPipesStatement showPipesStatement, final String userName) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2506,6 +2507,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
tShowPipeReq.setWhereClause(true);
}
tShowPipeReq.setIsTableModel(showPipesStatement.isTableModel());
+ if (Objects.nonNull(userName)) {
+ tShowPipeReq.setUserName(userName);
+ }
final List<TShowPipeInfo> tShowPipeInfoList =
configNodeClient.showPipe(tShowPipeReq).getPipeInfoList();
ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index c7a9b9af13c..96471f21036 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -223,7 +223,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement
stopPipeStatement);
- SettableFuture<ConfigTaskResult> showPipes(ShowPipesStatement
showPipesStatement);
+ SettableFuture<ConfigTaskResult> showPipes(
+ ShowPipesStatement showPipesStatement, String userName);
SettableFuture<ConfigTaskResult> showSubscriptions(
ShowSubscriptionsStatement showSubscriptionsStatement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 76b8680c378..445b5320ba1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -47,22 +47,24 @@ import java.util.stream.Collectors;
public class ShowPipeTask implements IConfigTask {
private final ShowPipesStatement showPipesStatement;
+ private String userName;
public ShowPipeTask(final ShowPipesStatement showPipesStatement) {
this.showPipesStatement = showPipesStatement;
}
- public ShowPipeTask(final ShowPipes node) {
+ public ShowPipeTask(final ShowPipes node, final String userName) {
showPipesStatement = new ShowPipesStatement();
showPipesStatement.setPipeName(node.getPipeName());
showPipesStatement.setWhereClause(node.hasWhereClause());
showPipesStatement.setTableModel(true);
+ this.userName = userName;
}
@Override
public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.showPipes(showPipesStatement);
+ return configTaskExecutor.showPipes(showPipesStatement, userName);
}
public static void buildTSBlock(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaFormatUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaFormatUtils.java
index 579fb8f9cb6..c70b5d83818 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaFormatUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaFormatUtils.java
@@ -103,7 +103,7 @@ public class MetaFormatUtils {
/** check whether the database name uses illegal characters */
public static void checkDatabase(final String database) throws
IllegalPathException {
- if (!IoTDBConfig.STORAGE_GROUP_PATTERN.matcher(database).matches()) {
+ if (!IoTDBConfig.DATABASE_PATTERN.matcher(database).matches()) {
throw new IllegalPathException(
String.format(
"The database name can only contain english or chinese
characters, numbers, backticks and underscores. %s",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
index 6f373dd49ea..77a211de5d2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
@@ -98,9 +98,6 @@ public abstract class BasicAuthorizer implements IAuthorizer,
IService {
}
}
- /** Checks if a user has admin privileges */
- protected abstract boolean isAdmin(String username);
-
private void checkAdmin(String username, String errmsg) throws AuthException
{
if (isAdmin(username)) {
throw new AuthException(TSStatusCode.NO_PERMISSION, errmsg);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
index 82143345ba5..e98474a8f2a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
@@ -34,6 +34,8 @@ import java.util.Set;
/** This interface provides all authorization-relative operations. */
public interface IAuthorizer extends SnapshotProcessor {
+ boolean isAdmin(String userName);
+
/**
* Login for a user.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/LocalFileAuthorizer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/LocalFileAuthorizer.java
index 8eead3eceb8..117871f6e33 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/LocalFileAuthorizer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/LocalFileAuthorizer.java
@@ -35,7 +35,7 @@ public class LocalFileAuthorizer extends BasicAuthorizer {
}
@Override
- protected boolean isAdmin(String username) {
+ public boolean isAdmin(String username) {
return config.getAdminName().equals(username);
}
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 37e5acd02a9..4fd8845137a 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -872,6 +872,7 @@ struct TShowPipeReq {
1: optional string pipeName
2: optional bool whereClause
3: optional bool isTableModel
+ 4: optional string userName
}
struct TShowPipeResp {