This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d1ae3ce1a2e Add IT for DELETE TIMESERIES replica consistency under
IoTConsensusV2 (#17332)
d1ae3ce1a2e is described below
commit d1ae3ce1a2e957401a71d6f815900d1b943f15e9
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Mar 27 11:24:47 2026 +0800
Add IT for DELETE TIMESERIES replica consistency under IoTConsensusV2
(#17332)
Add testDeleteTimeSeriesReplicaConsistency() to verify that DELETE
TIMESERIES operations are properly replicated across all DataNode
replicas in a 3C3D IoTConsensusV2 cluster. This test reproduces the
scenario from the historical deletion replication bug where deletion
events lacking replicateIndex were silently dropped by the Processor.
The test inserts data with 3 measurements, leaves some data unflushed,
deletes one timeseries, then verifies schema consistency on every
DataNode — including after stopping and restarting each node in turn
to trigger consensus pipe reconstruction and historical replay.
Also unifies INSERTION constants to use 3 columns (speed, temperature,
power) across all test methods, removing the prior 2-column variants.
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../IoTDBIoTConsensusV23C3DBasicITBase.java | 210 ++++++++++++++++++++-
.../batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java | 6 +
.../IoTDBIoTConsensusV2Stream3C3DBasicIT.java | 6 +
3 files changed, 213 insertions(+), 9 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
index 9544ac5cf2b..ec04aab39bd 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
@@ -37,6 +37,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;
protected static final String INSERTION1 =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES
(100, 1, 2, 3)";
protected static final String INSERTION2 =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES
(101, 4, 5, 6)";
protected static final String INSERTION3 =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)";
+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES
(102, 7, 8, 9)";
protected static final String FLUSH_COMMAND = "flush on cluster";
protected static final String COUNT_QUERY = "select count(*) from
root.sg.**";
- protected static final String SELECT_ALL_QUERY = "select speed, temperature
from root.sg.d1";
+ protected static final String SELECT_ALL_QUERY =
+ "select speed, temperature, power from root.sg.d1";
+ protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES
root.sg.d1.speed";
+ protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES
root.sg.d1.*";
+ protected static final String SELECT_SURVIVING_QUERY =
+ "SELECT temperature, power FROM root.sg.d1";
/**
* Returns IoTConsensusV2 mode: {@link
ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link
@@ -210,6 +217,187 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
}
}
+ /**
+ * Test that DELETE TIMESERIES is properly replicated to all DataNode
replicas via IoTConsensusV2.
+ *
+ * <p>This test reproduces the scenario from the historical deletion
replication bug: when a
+ * timeseries is deleted after data insertion (with some unflushed data),
the deletion event must
+ * be consistently replicated to all replicas. After waiting for replication
to complete, stopping
+ * each DataNode in turn should show the same schema on all surviving nodes.
+ *
+ * <p>Scenario:
+ *
+ * <ol>
+ * <li>Insert data into root.sg.d1 with 3 measurements (speed,
temperature, power), flush
+ * <li>Insert more data (unflushed to create WAL-only entries)
+ * <li>DELETE TIMESERIES root.sg.d1.speed
+ * <li>Flush again to persist deletion
+ * <li>Wait for replication to complete on all DataNodes
+ * <li>Verify that every DataNode independently shows the same timeseries
(speed is gone)
+ * </ol>
+ */
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ try (Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement =
makeItCloseQuietly(connection.createStatement())) {
+
+ // Step 1: Insert data with 3 measurements and flush
+ LOGGER.info(
+ "Step 1: Inserting data with 3 measurements and flushing (mode:
{})...",
+ getIoTConsensusV2Mode());
+ statement.execute(INSERTION1);
+ statement.execute(INSERTION2);
+ statement.execute(FLUSH_COMMAND);
+
+ // Step 2: Insert more data without flush (creates WAL-only entries)
+ LOGGER.info("Step 2: Inserting more data without flush (WAL-only
entries)...");
+ statement.execute(INSERTION3);
+
+ // Step 3: Delete one timeseries
+ LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed...");
+ statement.execute(DELETE_TIMESERIES_SPEED);
+
+ // Step 4: Flush again to persist the deletion
+ LOGGER.info("Step 4: Flushing to persist deletion...");
+ statement.execute(FLUSH_COMMAND);
+
+ // Verify on the current connection: speed should be gone, 2 timeseries
remain
+ verifyTimeSeriesAfterDelete(statement, "via initial connection");
+
+ // Step 5: Wait for replication to complete on data region leaders
+ LOGGER.info("Step 5: Waiting for replication to complete on data region
leaders...");
+ Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMap =
+ getDataRegionMapWithLeader(statement);
+ Set<Integer> leaderNodeIds = new HashSet<>();
+ for (Pair<Integer, Set<Integer>> leaderAndReplicas :
dataRegionMap.values()) {
+ if (leaderAndReplicas.getLeft() > 0) {
+ leaderNodeIds.add(leaderAndReplicas.getLeft());
+ }
+ }
+ for (int leaderNodeId : leaderNodeIds) {
+ EnvFactory.getEnv()
+ .dataNodeIdToWrapper(leaderNodeId)
+ .ifPresent(this::waitForReplicationComplete);
+ }
+
+ // Step 6: Verify schema consistency on each DataNode independently
+ LOGGER.info("Step 6: Verifying schema consistency on each DataNode
independently...");
+ List<DataNodeWrapper> dataNodeWrappers =
EnvFactory.getEnv().getDataNodeWrapperList();
+ for (DataNodeWrapper wrapper : dataNodeWrappers) {
+ String nodeDescription = "DataNode " + wrapper.getIp() + ":" +
wrapper.getPort();
+ LOGGER.info("Verifying schema on {}", nodeDescription);
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (Connection nodeConn =
+ makeItCloseQuietly(
+ EnvFactory.getEnv()
+ .getConnection(
+ wrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT));
+ Statement nodeStmt =
makeItCloseQuietly(nodeConn.createStatement())) {
+ verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription);
+ }
+ });
+ }
+
+ // Step 7: Stop each DataNode one by one and verify remaining nodes
still consistent
+ LOGGER.info(
+ "Step 7: Stopping each DataNode in turn and verifying remaining
nodes show consistent schema...");
+ for (DataNodeWrapper stoppedNode : dataNodeWrappers) {
+ String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" +
stoppedNode.getPort();
+ LOGGER.info("Stopping {}", stoppedDesc);
+ stoppedNode.stopForcibly();
+ Assert.assertFalse(stoppedDesc + " should be stopped",
stoppedNode.isAlive());
+
+ try {
+ // Verify schema on each surviving node
+ for (DataNodeWrapper aliveNode : dataNodeWrappers) {
+ if (aliveNode == stoppedNode) {
+ continue;
+ }
+ String aliveDesc = "DataNode " + aliveNode.getIp() + ":" +
aliveNode.getPort();
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (Connection aliveConn =
+ makeItCloseQuietly(
+ EnvFactory.getEnv()
+ .getConnection(
+ aliveNode,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT));
+ Statement aliveStmt =
makeItCloseQuietly(aliveConn.createStatement())) {
+ verifyTimeSeriesAfterDelete(
+ aliveStmt, aliveDesc + " (while " + stoppedDesc +
" is down)");
+ }
+ });
+ }
+ } finally {
+ // Restart the stopped node before moving to the next iteration
+ LOGGER.info("Restarting {}", stoppedDesc);
+ stoppedNode.start();
+ // Wait for the restarted node to rejoin
+ Awaitility.await()
+ .atMost(120, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .until(stoppedNode::isAlive);
+ }
+ }
+
+ LOGGER.info(
+ "DELETE TIMESERIES replica consistency test passed for mode: {}",
+ getIoTConsensusV2Mode());
+ }
+ }
+
+ /**
+ * Verify that after deleting root.sg.d1.speed, only temperature and power
timeseries remain, and
+ * that data queries do not return the deleted timeseries.
+ */
+ private void verifyTimeSeriesAfterDelete(Statement statement, String
context) throws Exception {
+ // Verify via SHOW TIMESERIES: speed should be gone, only temperature and
power remain
+ Set<String> timeseries = new HashSet<>();
+ try (ResultSet resultSet = statement.executeQuery(SHOW_TIMESERIES_D1)) {
+ while (resultSet.next()) {
+ timeseries.add(resultSet.getString("Timeseries"));
+ }
+ }
+ LOGGER.info("[{}] SHOW TIMESERIES result: {}", context, timeseries);
+ Assert.assertEquals(
+ "[" + context + "] Expected exactly 2 timeseries after delete
(temperature, power)",
+ 2,
+ timeseries.size());
+ Assert.assertFalse(
+ "[" + context + "] root.sg.d1.speed should have been deleted",
+ timeseries.contains("root.sg.d1.speed"));
+ Assert.assertTrue(
+ "[" + context + "] root.sg.d1.temperature should still exist",
+ timeseries.contains("root.sg.d1.temperature"));
+ Assert.assertTrue(
+ "[" + context + "] root.sg.d1.power should still exist",
+ timeseries.contains("root.sg.d1.power"));
+
+ // Verify via SELECT: only temperature and power columns should return data
+ try (ResultSet selectResult =
statement.executeQuery(SELECT_SURVIVING_QUERY)) {
+ int rowCount = 0;
+ while (selectResult.next()) {
+ rowCount++;
+ }
+ // After delete, remaining data depends on whether unflushed data for
the deleted
+ // timeseries was also cleaned up. We mainly verify that the query
doesn't fail
+ // and that some rows are returned for the surviving measurements.
+ Assert.assertTrue(
+ "[" + context + "] Expected at least 1 row from SELECT on surviving
timeseries",
+ rowCount >= 1);
+ }
+ }
+
private static final Pattern SYNC_LAG_PATTERN =
Pattern.compile("iot_consensus_v2\\{[^}]*type=\"syncLag\"[^}]*}\\s+(\\S+)");
@@ -259,7 +447,7 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
totalCount += parseLongFromString(countResult.getString(i));
}
Assert.assertEquals(
- "Expected 6 total data points (3 timestamps x 2 measurements)", 6,
totalCount);
+ "Expected 9 total data points (3 timestamps x 3 measurements)", 9,
totalCount);
}
int rowCount = 0;
@@ -269,15 +457,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
long timestamp = parseLongFromString(selectResult.getString(1));
long speed = parseLongFromString(selectResult.getString(2));
long temperature = parseLongFromString(selectResult.getString(3));
+ long power = parseLongFromString(selectResult.getString(4));
if (timestamp == 100) {
Assert.assertEquals(1, speed);
Assert.assertEquals(2, temperature);
+ Assert.assertEquals(3, power);
} else if (timestamp == 101) {
- Assert.assertEquals(3, speed);
- Assert.assertEquals(4, temperature);
+ Assert.assertEquals(4, speed);
+ Assert.assertEquals(5, temperature);
+ Assert.assertEquals(6, power);
} else if (timestamp == 102) {
- Assert.assertEquals(5, speed);
- Assert.assertEquals(6, temperature);
+ Assert.assertEquals(7, speed);
+ Assert.assertEquals(8, temperature);
+ Assert.assertEquals(9, power);
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
index f71462fa470..bb97014d213 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/batch/IoTDBIoTConsensusV2Batch3C3DBasicIT.java
@@ -49,4 +49,10 @@ public class IoTDBIoTConsensusV2Batch3C3DBasicIT extends
IoTDBIoTConsensusV23C3D
public void test3C3DWriteFlushAndQuery() throws Exception {
super.test3C3DWriteFlushAndQuery();
}
+
+ @Override
+ @Test
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ super.testDeleteTimeSeriesReplicaConsistency();
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
index 856d3624bf1..d4c0bf22ab4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/stream/IoTDBIoTConsensusV2Stream3C3DBasicIT.java
@@ -49,4 +49,10 @@ public class IoTDBIoTConsensusV2Stream3C3DBasicIT extends
IoTDBIoTConsensusV23C3
public void test3C3DWriteFlushAndQuery() throws Exception {
super.test3C3DWriteFlushAndQuery();
}
+
+ @Override
+ @Test
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ super.testDeleteTimeSeriesReplicaConsistency();
+ }
}