This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-stuck-by-file
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fix-pipe-stuck-by-file by this 
push:
     new 3e43a341244 Subscription IT: flush after realtime data insertion in 
IoTDBDefaultPullConsumerDataSetIT (#15089)
3e43a341244 is described below

commit 3e43a34124425f18399e5eefdd75747ef3240d0a
Author: VGalaxies <[email protected]>
AuthorDate: Fri Mar 14 01:10:31 2025 +0800

    Subscription IT: flush after realtime data insertion in 
IoTDBDefaultPullConsumerDataSetIT (#15089)
---
 .../IoTDBDefaultPullConsumerDataSetIT.java         | 30 ++++++++++++++--------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
index fff68412db4..0b36deed5eb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
@@ -32,7 +32,6 @@ import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -42,6 +41,8 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2SubscriptionRegressionMisc.class})
 public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegressionIT {
@@ -92,7 +93,6 @@ public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegre
     session_src.insertTablet(tablet);
   }
 
-  @Ignore
   @Test
   public void do_test()
       throws InterruptedException,
@@ -119,10 +119,15 @@ public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegre
     String sql = "select count(s_0) from " + databasePrefix + "0.d_0";
     System.out.println(FORMAT.format(new Date()) + " src: " + 
getCount(session_src, sql));
     // Consumption data
-    consume_data(consumer, session_dest);
-    for (int i = 0; i < deviceCount; i++) {
-      check_count(10, "select count(s_0) from " + devices.get(i), i + 
":Consumption Data:s_0");
-    }
+    AWAIT.untilAsserted(
+        () -> {
+          session_src.executeNonQueryStatement("flush");
+          consume_data(consumer, session_dest);
+          for (int i = 0; i < deviceCount; i++) {
+            check_count(
+                10, "select count(s_0) from " + devices.get(i), i + 
":Consumption Data:s_0");
+          }
+        });
     // Unsubscribe
     consumer.unsubscribe(topicName);
     // Unsubscribe and then write data
@@ -135,9 +140,14 @@ public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegre
     System.out.println(FORMAT.format(new Date()) + " src: " + 
getCount(session_src, sql));
     // Consumption data: Progress is not retained when re-subscribing after 
cancellation. Full
     // synchronization.
-    consume_data(consumer, session_dest);
-    for (int i = 0; i < deviceCount; i++) {
-      check_count(15, "select count(s_0) from " + devices.get(i), i + 
":consume data again:s_0");
-    }
+    AWAIT.untilAsserted(
+        () -> {
+          session_src.executeNonQueryStatement("flush");
+          consume_data(consumer, session_dest);
+          for (int i = 0; i < deviceCount; i++) {
+            check_count(
+                15, "select count(s_0) from " + devices.get(i), i + ":consume 
data again:s_0");
+          }
+        });
   }
 }

Reply via email to