This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d737f4b301b Stabilize ZSTD compressor level pipe IT (#17917)
d737f4b301b is described below
commit d737f4b301b92deba9178e04dac0fbb380ebcef4
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:48:03 2026 +0800
Stabilize ZSTD compressor level pipe IT (#17917)
* Strictly isolate pipe tree and table visibility
* Add pipe visibility unit test coverage
* Add pipe static meta visibility edge tests
* Add pipe table visibility filter test
* Fix table pipe RPC visibility in ITs
* Fix zstd compressor level IT assertion
* it-fix
* Update IoTDBPipeSinkCompressionIT.java
* Stabilize zstd compressor level pipe IT
* Keep only zstd compression pipe IT changes
---
.../enhanced/IoTDBPipeSinkCompressionIT.java | 4 +-
.../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 180 +++++++++------------
2 files changed, 78 insertions(+), 106 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index 0ab05994fb6..fe2b854863d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -336,7 +336,9 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeTableModelDualManual
}
final List<TShowPipeInfo> showPipeResult =
- client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+ .pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(
3,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 739148f1523..5424c9f87a7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -223,105 +223,17 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueries(
- senderEnv,
- Arrays.asList(
- "insert into root.db.d1(time, s1) values (1, 1)",
- "insert into root.db.d1(time, s2) values (1, 1)",
- "insert into root.db.d1(time, s3) values (1, 1)",
- "insert into root.db.d1(time, s4) values (1, 1)",
- "insert into root.db.d1(time, s5) values (1, 1)",
- "flush"),
- null);
-
- // Create 5 pipes with different zstd compression levels, p4 and p5
should fail.
-
- try (final Connection connection = senderEnv.getConnection();
- final Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "create pipe p1"
- + " with source ('source.pattern'='root.db.d1.s1')"
- + " with sink ("
- + "'sink.ip'='%s',"
- + "'sink.port'='%s',"
- + "'sink.compressor'='zstd, zstd',"
- + "'sink.compressor.zstd.level'='3')",
- receiverIp, receiverPort));
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- try (final Connection connection = senderEnv.getConnection();
- final Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "create pipe p2"
- + " with source ('source.pattern'='root.db.d1.s2')"
- + " with sink ("
- + "'sink.ip'='%s',"
- + "'sink.port'='%s',"
- + "'sink.compressor'='zstd, zstd',"
- + "'sink.compressor.zstd.level'='22')",
- receiverIp, receiverPort));
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- try (final Connection connection = senderEnv.getConnection();
- final Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "create pipe p3"
- + " with source ('source.pattern'='root.db.d1.s3')"
- + " with sink ("
- + "'sink.ip'='%s',"
- + "'sink.port'='%s',"
- + "'sink.compressor'='zstd, zstd',"
- + "'sink.compressor.zstd.level'='-131072')",
- receiverIp, receiverPort));
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- try (final Connection connection = senderEnv.getConnection();
- final Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "create pipe p4"
- + " with source ('source.pattern'='root.db.d1.s4')"
- + " with sink ("
- + "'sink.ip'='%s',"
- + "'sink.port'='%s',"
- + "'sink.compressor'='zstd, zstd',"
- + "'sink.compressor.zstd.level'='-131073')",
- receiverIp, receiverPort));
- fail();
- } catch (SQLException e) {
- // Make sure the error message in IoTDBConnector.java is returned
- Assert.assertTrue(e.getMessage().contains("Zstd compression level
should be in the range"));
- }
-
- try (final Connection connection = senderEnv.getConnection();
- final Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "create pipe p5"
- + " with source ('source.pattern'='root.db.d1.s5')"
- + " with sink ("
- + "'sink.ip'='%s',"
- + "'sink.port'='%s',"
- + "'sink.compressor'='zstd, zstd',"
- + "'sink.compressor.zstd.level'='23')",
- receiverIp, receiverPort));
- fail();
- } catch (SQLException e) {
- // Make sure the error message in IoTDBConnector.java is returned
- Assert.assertTrue(e.getMessage().contains("Zstd compression level
should be in the range"));
- }
+ // Create legal zstd level pipes one by one, so the assertion identifies
the exact level
+ // that fails and avoids concurrent historical TsFile splitting for this
level test.
+ createZstdPipeAndAssertData(
+ "p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1",
handleFailure);
+ createZstdPipeAndAssertData(
+ "p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2",
handleFailure);
+ createZstdPipeAndAssertData(
+ "p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3",
handleFailure);
+
+ assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp,
receiverPort);
+ assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp,
receiverPort);
final List<TShowPipeInfo> showPipeResult =
client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
@@ -331,13 +243,71 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
showPipeResult.stream()
.filter(info ->
!info.id.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX))
.count());
+ }
+ }
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "count timeseries root.db.**",
- "count(timeseries),",
- Collections.singleton("3,"),
- handleFailure);
+ private void createZstdPipeAndAssertData(
+ final String pipeName,
+ final String sourcePattern,
+ final String zstdLevel,
+ final String receiverIp,
+ final int receiverPort,
+ final String measurement,
+ final Consumer<String> handleFailure) {
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ String.format("insert into root.db.d1(time, %s) values (1, 1)",
measurement), "flush"),
+ null);
+
+ try {
+ createZstdPipe(pipeName, sourcePattern, zstdLevel, receiverIp,
receiverPort);
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ String.format("select count(%s) from root.db.d1", measurement),
+ String.format("count(root.db.d1.%s),", measurement),
+ Collections.singleton("1,"),
+ handleFailure);
+ }
+
+ private void assertCreateZstdPipeFailed(
+ final String pipeName,
+ final String sourcePattern,
+ final String zstdLevel,
+ final String receiverIp,
+ final int receiverPort) {
+ try {
+ createZstdPipe(pipeName, sourcePattern, zstdLevel, receiverIp,
receiverPort);
+ fail();
+ } catch (final SQLException e) {
+ Assert.assertTrue(e.getMessage().contains("Zstd compression level should
be in the range"));
+ }
+ }
+
+ private void createZstdPipe(
+ final String pipeName,
+ final String sourcePattern,
+ final String zstdLevel,
+ final String receiverIp,
+ final int receiverPort)
+ throws SQLException {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ('source.pattern'='%s')"
+ + " with sink ("
+ + "'sink.ip'='%s',"
+ + "'sink.port'='%s',"
+ + "'sink.compressor'='zstd, zstd',"
+ + "'sink.compressor.zstd.level'='%s')",
+ pipeName, sourcePattern, receiverIp, receiverPort, zstdLevel));
}
}
}