This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_ratis_restart
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7985d1af9ab153a96f25bfd0165ec9cb4bb3d83d
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 21 19:08:51 2025 +0800

    fix ratis restart
---
 .../apache/iotdb/db/it/IoTDBRestartRatisIT.java    | 372 +++++++++++++++++++++
 1 file changed, 372 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartRatisIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartRatisIT.java
new file mode 100644
index 00000000000..d871df91def
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartRatisIT.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.*;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBRestartRatisIT {
+
+  private final Logger logger = 
LoggerFactory.getLogger(IoTDBRestartRatisIT.class);
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setWalMode("SYNC");
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+        
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+    EnvFactory.getEnv().getConfig().getCommonConfig().setWalMode("ASYNC");
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.simple.SimpleConsensus");
+  }
+
+  @Test
+  public void testRestart() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(1,1.0)");
+      statement.execute("flush");
+    }
+
+    try {
+      TestUtils.restartDataNodes();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,1.0)");
+    }
+
+    try {
+      TestUtils.restartDataNodes();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(3,1.0)");
+
+      String[] exp = new String[] {"1,1.0", "2,1.0", "3,1.0"};
+      int cnt = 0;
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM 
root.turbine.d1")) {
+        assertNotNull(resultSet);
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRestartDelete() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,2)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(3,3)");
+    }
+
+    TestUtils.restartDataNodes();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("delete from root.turbine.d1.s1 where time<=1");
+
+      ResultSet resultSet = statement.executeQuery("SELECT s1 FROM 
root.turbine.d1");
+      assertNotNull(resultSet);
+      String[] exp = new String[] {"2,2.0", "3,3.0"};
+      int cnt = 0;
+      try {
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+
+        statement.execute("flush");
+        statement.execute("delete from root.turbine.d1.s1 where time<=2");
+
+        exp = new String[] {"3,3.0"};
+        resultSet = statement.executeQuery("SELECT s1 FROM root.turbine.d1");
+        assertNotNull(resultSet);
+        cnt = 0;
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+      } finally {
+        resultSet.close();
+      }
+    }
+  }
+
+  @Test
+  public void testRestartQueryLargerThanEndTime() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,2)");
+    }
+
+    try {
+      TestUtils.restartDataNodes();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(3,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(4,2)");
+    }
+
+    try {
+      TestUtils.restartDataNodes();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] exp =
+          new String[] {
+            "4,2.0",
+          };
+      int cnt = 0;
+      try (ResultSet resultSet =
+          statement.executeQuery("SELECT s1 FROM root.turbine.d1 where time > 
3")) {
+        assertNotNull(resultSet);
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+      }
+      assertEquals(1, cnt);
+    }
+  }
+
+  @Test
+  public void testRestartEndTime() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s1) 
values(2,2)");
+    }
+
+    try {
+      TestUtils.restartDataNodes();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine.d1(timestamp,s2) 
values(1,1)");
+      statement.execute("insert into root.turbine.d1(timestamp,s2) 
values(2,2)");
+    }
+
+    try {
+      TestUtils.restartDataNodes();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] exp = new String[] {"1,1.0", "2,2.0"};
+      int cnt = 0;
+      try (ResultSet resultSet = statement.executeQuery("SELECT s2 FROM 
root.turbine.d1")) {
+        assertNotNull(resultSet);
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+      }
+      assertEquals(2, cnt);
+    }
+  }
+
+  @Test
+  public void testRecoverWALMismatchDataType() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) 
values(1,1.1,2.2)");
+      statement.execute("delete timeseries root.turbine1.d1.s1");
+      statement.execute(
+          "create timeseries root.turbine1.d1.s1 with datatype=INT32, 
encoding=RLE, compression=SNAPPY");
+    }
+
+    TestUtils.restartDataNodes();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.**")) {
+        assertNotNull(resultSet);
+        int cnt = 0;
+        assertEquals(3, resultSet.getMetaData().getColumnCount());
+        while (resultSet.next()) {
+          assertEquals("1", resultSet.getString(1));
+          assertNull(resultSet.getString(2));
+          assertEquals("2.2", resultSet.getString(3));
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void testRecoverWALDeleteSchema() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) 
values(1,1.1,2.2)");
+      statement.execute("delete timeseries root.turbine1.d1.s1");
+    }
+
+    TestUtils.restartDataNodes();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.**")) {
+        assertNotNull(resultSet);
+        int cnt = 0;
+        assertEquals(2, resultSet.getMetaData().getColumnCount());
+        while (resultSet.next()) {
+          assertEquals("1", resultSet.getString(1));
+          assertEquals("2.2", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    long tsFileSize = config.getSeqTsFileSize();
+    long unFsFileSize = config.getSeqTsFileSize();
+    config.setSeqTsFileSize(10000000);
+    config.setUnSeqTsFileSize(10000000);
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("create timeseries root.turbine1.d1.s1 with 
datatype=INT64");
+      statement.execute("insert into root.turbine1.d1(timestamp,s1) 
values(1,1)");
+      statement.execute("insert into root.turbine1.d1(timestamp,s1) 
values(2,1)");
+      statement.execute("flush");
+      statement.execute("create timeseries root.turbine1.d1.s2 with 
datatype=BOOLEAN");
+      statement.execute("insert into root.turbine1.d1(timestamp,s2) 
values(3,true)");
+      statement.execute("insert into root.turbine1.d1(timestamp,s2) 
values(4,true)");
+    }
+
+    TestUtils.restartDataNodes();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      long[] result = new long[] {1L, 2L};
+      ResultSet resultSet =
+          statement.executeQuery("select s1 from root.turbine1.d1 where time < 
3");
+      assertNotNull(resultSet);
+      int cnt = 0;
+      while (resultSet.next()) {
+        assertEquals(resultSet.getLong(1), result[cnt]);
+        cnt++;
+      }
+      assertEquals(2, cnt);
+    }
+
+    config.setSeqTsFileSize(tsFileSize);
+    config.setUnSeqTsFileSize(unFsFileSize);
+  }
+
+  @Test
+  public void testRecoverFromFlushMemTableError() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) 
values(1,1.1,2.2)");
+    }
+
+    // mock exception during flush memtable
+    TestUtils.restartDataNodes();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.**")) {
+        assertNotNull(resultSet);
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals("1", resultSet.getString(1));
+          assertEquals("1.1", resultSet.getString(2));
+          assertEquals("2.2", resultSet.getString(3));
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    }
+  }
+}

Reply via email to