This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d1ae3ce1a2e Add IT for DELETE TIMESERIES replica consistency under 
IoTConsensusV2 (#17332)
d1ae3ce1a2e is described below

commit d1ae3ce1a2e957401a71d6f815900d1b943f15e9
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Mar 27 11:24:47 2026 +0800

    Add IT for DELETE TIMESERIES replica consistency under IoTConsensusV2 
(#17332)
    
    Add testDeleteTimeSeriesReplicaConsistency() to verify that DELETE
    TIMESERIES operations are properly replicated across all DataNode
    replicas in a 3C3D IoTConsensusV2 cluster. This test reproduces the
    scenario from the historical deletion replication bug where deletion
    events lacking replicateIndex were silently dropped by the Processor.
    
    The test inserts data with 3 measurements, leaves some data unflushed,
    deletes one timeseries, then verifies schema consistency on every
    DataNode — including after stopping and restarting each node in turn
    to trigger consensus pipe reconstruction and historical replay.
    
    Also unifies INSERTION constants to use 3 columns (speed, temperature,
    power) across all test methods, removing the prior 2-column variants.
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../IoTDBIoTConsensusV23C3DBasicITBase.java        | 210 ++++++++++++++++++++-
 .../batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java |   6 +
 .../IoTDBIoTConsensusV2Stream3C3DBasicIT.java      |   6 +
 3 files changed, 213 insertions(+), 9 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
index 9544ac5cf2b..ec04aab39bd 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
@@ -37,6 +37,8 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
   protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;
 
   protected static final String INSERTION1 =
-      "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
+      "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES 
(100, 1, 2, 3)";
   protected static final String INSERTION2 =
-      "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
+      "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES 
(101, 4, 5, 6)";
   protected static final String INSERTION3 =
-      "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)";
+      "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES 
(102, 7, 8, 9)";
   protected static final String FLUSH_COMMAND = "flush on cluster";
   protected static final String COUNT_QUERY = "select count(*) from 
root.sg.**";
-  protected static final String SELECT_ALL_QUERY = "select speed, temperature 
from root.sg.d1";
+  protected static final String SELECT_ALL_QUERY =
+      "select speed, temperature, power from root.sg.d1";
+  protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES 
root.sg.d1.speed";
+  protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES 
root.sg.d1.*";
+  protected static final String SELECT_SURVIVING_QUERY =
+      "SELECT temperature, power FROM root.sg.d1";
 
   /**
    * Returns IoTConsensusV2 mode: {@link 
ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link
@@ -210,6 +217,187 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
     }
   }
 
+  /**
+   * Test that DELETE TIMESERIES is properly replicated to all DataNode 
replicas via IoTConsensusV2.
+   *
+   * <p>This test reproduces the scenario from the historical deletion 
replication bug: when a
+   * timeseries is deleted after data insertion (with some unflushed data), 
the deletion event must
+   * be consistently replicated to all replicas. After waiting for replication 
to complete, stopping
+   * each DataNode in turn should show the same schema on all surviving nodes.
+   *
+   * <p>Scenario:
+   *
+   * <ol>
+   *   <li>Insert data into root.sg.d1 with 3 measurements (speed, 
temperature, power), flush
+   *   <li>Insert more data (unflushed to create WAL-only entries)
+   *   <li>DELETE TIMESERIES root.sg.d1.speed
+   *   <li>Flush again to persist deletion
+   *   <li>Wait for replication to complete on all DataNodes
+   *   <li>Verify that every DataNode independently shows the same timeseries 
(speed is gone)
+   * </ol>
+   */
+  public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+    try (Connection connection = 
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+        Statement statement = 
makeItCloseQuietly(connection.createStatement())) {
+
+      // Step 1: Insert data with 3 measurements and flush
+      LOGGER.info(
+          "Step 1: Inserting data with 3 measurements and flushing (mode: 
{})...",
+          getIoTConsensusV2Mode());
+      statement.execute(INSERTION1);
+      statement.execute(INSERTION2);
+      statement.execute(FLUSH_COMMAND);
+
+      // Step 2: Insert more data without flush (creates WAL-only entries)
+      LOGGER.info("Step 2: Inserting more data without flush (WAL-only 
entries)...");
+      statement.execute(INSERTION3);
+
+      // Step 3: Delete one timeseries
+      LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed...");
+      statement.execute(DELETE_TIMESERIES_SPEED);
+
+      // Step 4: Flush again to persist the deletion
+      LOGGER.info("Step 4: Flushing to persist deletion...");
+      statement.execute(FLUSH_COMMAND);
+
+      // Verify on the current connection: speed should be gone, 2 timeseries 
remain
+      verifyTimeSeriesAfterDelete(statement, "via initial connection");
+
+      // Step 5: Wait for replication to complete on data region leaders
+      LOGGER.info("Step 5: Waiting for replication to complete on data region 
leaders...");
+      Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMap =
+          getDataRegionMapWithLeader(statement);
+      Set<Integer> leaderNodeIds = new HashSet<>();
+      for (Pair<Integer, Set<Integer>> leaderAndReplicas : 
dataRegionMap.values()) {
+        if (leaderAndReplicas.getLeft() > 0) {
+          leaderNodeIds.add(leaderAndReplicas.getLeft());
+        }
+      }
+      for (int leaderNodeId : leaderNodeIds) {
+        EnvFactory.getEnv()
+            .dataNodeIdToWrapper(leaderNodeId)
+            .ifPresent(this::waitForReplicationComplete);
+      }
+
+      // Step 6: Verify schema consistency on each DataNode independently
+      LOGGER.info("Step 6: Verifying schema consistency on each DataNode 
independently...");
+      List<DataNodeWrapper> dataNodeWrappers = 
EnvFactory.getEnv().getDataNodeWrapperList();
+      for (DataNodeWrapper wrapper : dataNodeWrappers) {
+        String nodeDescription = "DataNode " + wrapper.getIp() + ":" + 
wrapper.getPort();
+        LOGGER.info("Verifying schema on {}", nodeDescription);
+        Awaitility.await()
+            .atMost(60, TimeUnit.SECONDS)
+            .untilAsserted(
+                () -> {
+                  try (Connection nodeConn =
+                          makeItCloseQuietly(
+                              EnvFactory.getEnv()
+                                  .getConnection(
+                                      wrapper,
+                                      SessionConfig.DEFAULT_USER,
+                                      SessionConfig.DEFAULT_PASSWORD,
+                                      BaseEnv.TREE_SQL_DIALECT));
+                      Statement nodeStmt = 
makeItCloseQuietly(nodeConn.createStatement())) {
+                    verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription);
+                  }
+                });
+      }
+
+      // Step 7: Stop each DataNode one by one and verify remaining nodes 
still consistent
+      LOGGER.info(
+          "Step 7: Stopping each DataNode in turn and verifying remaining 
nodes show consistent schema...");
+      for (DataNodeWrapper stoppedNode : dataNodeWrappers) {
+        String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" + 
stoppedNode.getPort();
+        LOGGER.info("Stopping {}", stoppedDesc);
+        stoppedNode.stopForcibly();
+        Assert.assertFalse(stoppedDesc + " should be stopped", 
stoppedNode.isAlive());
+
+        try {
+          // Verify schema on each surviving node
+          for (DataNodeWrapper aliveNode : dataNodeWrappers) {
+            if (aliveNode == stoppedNode) {
+              continue;
+            }
+            String aliveDesc = "DataNode " + aliveNode.getIp() + ":" + 
aliveNode.getPort();
+            Awaitility.await()
+                .pollDelay(1, TimeUnit.SECONDS)
+                .atMost(90, TimeUnit.SECONDS)
+                .untilAsserted(
+                    () -> {
+                      try (Connection aliveConn =
+                              makeItCloseQuietly(
+                                  EnvFactory.getEnv()
+                                      .getConnection(
+                                          aliveNode,
+                                          SessionConfig.DEFAULT_USER,
+                                          SessionConfig.DEFAULT_PASSWORD,
+                                          BaseEnv.TREE_SQL_DIALECT));
+                          Statement aliveStmt = 
makeItCloseQuietly(aliveConn.createStatement())) {
+                        verifyTimeSeriesAfterDelete(
+                            aliveStmt, aliveDesc + " (while " + stoppedDesc + 
" is down)");
+                      }
+                    });
+          }
+        } finally {
+          // Restart the stopped node before moving to the next iteration
+          LOGGER.info("Restarting {}", stoppedDesc);
+          stoppedNode.start();
+          // Wait for the restarted node to rejoin
+          Awaitility.await()
+              .atMost(120, TimeUnit.SECONDS)
+              .pollInterval(2, TimeUnit.SECONDS)
+              .until(stoppedNode::isAlive);
+        }
+      }
+
+      LOGGER.info(
+          "DELETE TIMESERIES replica consistency test passed for mode: {}",
+          getIoTConsensusV2Mode());
+    }
+  }
+
+  /**
+   * Verify that after deleting root.sg.d1.speed, only temperature and power 
timeseries remain, and
+   * that data queries do not return the deleted timeseries.
+   */
+  private void verifyTimeSeriesAfterDelete(Statement statement, String 
context) throws Exception {
+    // Verify via SHOW TIMESERIES: speed should be gone, only temperature and 
power remain
+    Set<String> timeseries = new HashSet<>();
+    try (ResultSet resultSet = statement.executeQuery(SHOW_TIMESERIES_D1)) {
+      while (resultSet.next()) {
+        timeseries.add(resultSet.getString("Timeseries"));
+      }
+    }
+    LOGGER.info("[{}] SHOW TIMESERIES result: {}", context, timeseries);
+    Assert.assertEquals(
+        "[" + context + "] Expected exactly 2 timeseries after delete 
(temperature, power)",
+        2,
+        timeseries.size());
+    Assert.assertFalse(
+        "[" + context + "] root.sg.d1.speed should have been deleted",
+        timeseries.contains("root.sg.d1.speed"));
+    Assert.assertTrue(
+        "[" + context + "] root.sg.d1.temperature should still exist",
+        timeseries.contains("root.sg.d1.temperature"));
+    Assert.assertTrue(
+        "[" + context + "] root.sg.d1.power should still exist",
+        timeseries.contains("root.sg.d1.power"));
+
+    // Verify via SELECT: only temperature and power columns should return data
+    try (ResultSet selectResult = 
statement.executeQuery(SELECT_SURVIVING_QUERY)) {
+      int rowCount = 0;
+      while (selectResult.next()) {
+        rowCount++;
+      }
+      // After delete, remaining data depends on whether unflushed data for 
the deleted
+      // timeseries was also cleaned up. We mainly verify that the query 
doesn't fail
+      // and that some rows are returned for the surviving measurements.
+      Assert.assertTrue(
+          "[" + context + "] Expected at least 1 row from SELECT on surviving 
timeseries",
+          rowCount >= 1);
+    }
+  }
+
   private static final Pattern SYNC_LAG_PATTERN =
       
Pattern.compile("iot_consensus_v2\\{[^}]*type=\"syncLag\"[^}]*}\\s+(\\S+)");
 
@@ -259,7 +447,7 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
         totalCount += parseLongFromString(countResult.getString(i));
       }
       Assert.assertEquals(
-          "Expected 6 total data points (3 timestamps x 2 measurements)", 6, 
totalCount);
+          "Expected 9 total data points (3 timestamps x 3 measurements)", 9, 
totalCount);
     }
 
     int rowCount = 0;
@@ -269,15 +457,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
         long timestamp = parseLongFromString(selectResult.getString(1));
         long speed = parseLongFromString(selectResult.getString(2));
         long temperature = parseLongFromString(selectResult.getString(3));
+        long power = parseLongFromString(selectResult.getString(4));
         if (timestamp == 100) {
           Assert.assertEquals(1, speed);
           Assert.assertEquals(2, temperature);
+          Assert.assertEquals(3, power);
         } else if (timestamp == 101) {
-          Assert.assertEquals(3, speed);
-          Assert.assertEquals(4, temperature);
+          Assert.assertEquals(4, speed);
+          Assert.assertEquals(5, temperature);
+          Assert.assertEquals(6, power);
         } else if (timestamp == 102) {
-          Assert.assertEquals(5, speed);
-          Assert.assertEquals(6, temperature);
+          Assert.assertEquals(7, speed);
+          Assert.assertEquals(8, temperature);
+          Assert.assertEquals(9, power);
         }
       }
     }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
index f71462fa470..bb97014d213 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
@@ -49,4 +49,10 @@ public class IoTDBIoTConsensusV2Batch3C3DBasicIT extends 
IoTDBIoTConsensusV23C3D
   public void test3C3DWriteFlushAndQuery() throws Exception {
     super.test3C3DWriteFlushAndQuery();
   }
+
+  @Override
+  @Test
+  public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+    super.testDeleteTimeSeriesReplicaConsistency();
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
index 856d3624bf1..d4c0bf22ab4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
@@ -49,4 +49,10 @@ public class IoTDBIoTConsensusV2Stream3C3DBasicIT extends 
IoTDBIoTConsensusV23C3
   public void test3C3DWriteFlushAndQuery() throws Exception {
     super.test3C3DWriteFlushAndQuery();
   }
+
+  @Override
+  @Test
+  public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+    super.testDeleteTimeSeriesReplicaConsistency();
+  }
 }

Reply via email to