This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d737f4b301b Stabilize ZSTD compressor level pipe IT (#17917)
d737f4b301b is described below

commit d737f4b301b92deba9178e04dac0fbb380ebcef4
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:48:03 2026 +0800

    Stabilize ZSTD compressor level pipe IT (#17917)
    
    * Strictly isolate pipe tree and table visibility
    
    * Add pipe visibility unit test coverage
    
    * Add pipe static meta visibility edge tests
    
    * Add pipe table visibility filter test
    
    * Fix table pipe RPC visibility in ITs
    
    * Fix zstd compressor level IT assertion
    
    * it-fix
    
    * Update IoTDBPipeSinkCompressionIT.java
    
    * Stabilize zstd compressor level pipe IT
    
    * Keep only zstd compression pipe IT changes
---
 .../enhanced/IoTDBPipeSinkCompressionIT.java       |   4 +-
 .../auto/enhanced/IoTDBPipeSinkCompressionIT.java  | 180 +++++++++------------
 2 files changed, 78 insertions(+), 106 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index 0ab05994fb6..fe2b854863d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -336,7 +336,9 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeTableModelDualManual
       }
 
       final List<TShowPipeInfo> showPipeResult =
-          client.showPipe(new 
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
+          client.showPipe(
+                  new 
TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
+              .pipeInfoList;
       showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
       Assert.assertEquals(
           3,
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 739148f1523..5424c9f87a7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -223,105 +223,17 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
 
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      TestUtils.executeNonQueries(
-          senderEnv,
-          Arrays.asList(
-              "insert into root.db.d1(time, s1) values (1, 1)",
-              "insert into root.db.d1(time, s2) values (1, 1)",
-              "insert into root.db.d1(time, s3) values (1, 1)",
-              "insert into root.db.d1(time, s4) values (1, 1)",
-              "insert into root.db.d1(time, s5) values (1, 1)",
-              "flush"),
-          null);
-
-      // Create 5 pipes with different zstd compression levels, p4 and p5 
should fail.
-
-      try (final Connection connection = senderEnv.getConnection();
-          final Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format(
-                "create pipe p1"
-                    + " with source ('source.pattern'='root.db.d1.s1')"
-                    + " with sink ("
-                    + "'sink.ip'='%s',"
-                    + "'sink.port'='%s',"
-                    + "'sink.compressor'='zstd, zstd',"
-                    + "'sink.compressor.zstd.level'='3')",
-                receiverIp, receiverPort));
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
-
-      try (final Connection connection = senderEnv.getConnection();
-          final Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format(
-                "create pipe p2"
-                    + " with source ('source.pattern'='root.db.d1.s2')"
-                    + " with sink ("
-                    + "'sink.ip'='%s',"
-                    + "'sink.port'='%s',"
-                    + "'sink.compressor'='zstd, zstd',"
-                    + "'sink.compressor.zstd.level'='22')",
-                receiverIp, receiverPort));
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
-
-      try (final Connection connection = senderEnv.getConnection();
-          final Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format(
-                "create pipe p3"
-                    + " with source ('source.pattern'='root.db.d1.s3')"
-                    + " with sink ("
-                    + "'sink.ip'='%s',"
-                    + "'sink.port'='%s',"
-                    + "'sink.compressor'='zstd, zstd',"
-                    + "'sink.compressor.zstd.level'='-131072')",
-                receiverIp, receiverPort));
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
-
-      try (final Connection connection = senderEnv.getConnection();
-          final Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format(
-                "create pipe p4"
-                    + " with source ('source.pattern'='root.db.d1.s4')"
-                    + " with sink ("
-                    + "'sink.ip'='%s',"
-                    + "'sink.port'='%s',"
-                    + "'sink.compressor'='zstd, zstd',"
-                    + "'sink.compressor.zstd.level'='-131073')",
-                receiverIp, receiverPort));
-        fail();
-      } catch (SQLException e) {
-        // Make sure the error message in IoTDBConnector.java is returned
-        Assert.assertTrue(e.getMessage().contains("Zstd compression level 
should be in the range"));
-      }
-
-      try (final Connection connection = senderEnv.getConnection();
-          final Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format(
-                "create pipe p5"
-                    + " with source ('source.pattern'='root.db.d1.s5')"
-                    + " with sink ("
-                    + "'sink.ip'='%s',"
-                    + "'sink.port'='%s',"
-                    + "'sink.compressor'='zstd, zstd',"
-                    + "'sink.compressor.zstd.level'='23')",
-                receiverIp, receiverPort));
-        fail();
-      } catch (SQLException e) {
-        // Make sure the error message in IoTDBConnector.java is returned
-        Assert.assertTrue(e.getMessage().contains("Zstd compression level 
should be in the range"));
-      }
+      // Create legal zstd level pipes one by one, so the assertion identifies 
the exact level
+      // that fails and avoids concurrent historical TsFile splitting for this 
level test.
+      createZstdPipeAndAssertData(
+          "p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1", 
handleFailure);
+      createZstdPipeAndAssertData(
+          "p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2", 
handleFailure);
+      createZstdPipeAndAssertData(
+          "p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3", 
handleFailure);
+
+      assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp, 
receiverPort);
+      assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp, 
receiverPort);
 
       final List<TShowPipeInfo> showPipeResult =
           client.showPipe(new 
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
@@ -331,13 +243,71 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
           showPipeResult.stream()
               .filter(info -> 
!info.id.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX))
               .count());
+    }
+  }
 
-      TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "count timeseries root.db.**",
-          "count(timeseries),",
-          Collections.singleton("3,"),
-          handleFailure);
+  private void createZstdPipeAndAssertData(
+      final String pipeName,
+      final String sourcePattern,
+      final String zstdLevel,
+      final String receiverIp,
+      final int receiverPort,
+      final String measurement,
+      final Consumer<String> handleFailure) {
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            String.format("insert into root.db.d1(time, %s) values (1, 1)", 
measurement), "flush"),
+        null);
+
+    try {
+      createZstdPipe(pipeName, sourcePattern, zstdLevel, receiverIp, 
receiverPort);
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        String.format("select count(%s) from root.db.d1", measurement),
+        String.format("count(root.db.d1.%s),", measurement),
+        Collections.singleton("1,"),
+        handleFailure);
+  }
+
+  private void assertCreateZstdPipeFailed(
+      final String pipeName,
+      final String sourcePattern,
+      final String zstdLevel,
+      final String receiverIp,
+      final int receiverPort) {
+    try {
+      createZstdPipe(pipeName, sourcePattern, zstdLevel, receiverIp, 
receiverPort);
+      fail();
+    } catch (final SQLException e) {
+      Assert.assertTrue(e.getMessage().contains("Zstd compression level should 
be in the range"));
+    }
+  }
+
+  private void createZstdPipe(
+      final String pipeName,
+      final String sourcePattern,
+      final String zstdLevel,
+      final String receiverIp,
+      final int receiverPort)
+      throws SQLException {
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ('source.pattern'='%s')"
+                  + " with sink ("
+                  + "'sink.ip'='%s',"
+                  + "'sink.port'='%s',"
+                  + "'sink.compressor'='zstd, zstd',"
+                  + "'sink.compressor.zstd.level'='%s')",
+              pipeName, sourcePattern, receiverIp, receiverPort, zstdLevel));
     }
   }
 }

Reply via email to