This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5174
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6e9a29d205e7ae86296e942a9f12a88971c39120
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Dec 14 16:28:45 2022 +0800

    change version filename && add compatibility solution/test
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../iot/logdispatcher/IndexController.java         | 38 ++++++++++++++++++++--
 .../consensus/iot/logdispatcher/LogDispatcher.java |  3 +-
 .../iot/logdispatcher/IndexControllerTest.java     | 35 ++++++++++++++++----
 .../iot/logdispatcher/SyncStatusTest.java          | 15 ++++++---
 4 files changed, 75 insertions(+), 16 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index e5242c6611..45c9a7f358 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -30,6 +32,8 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** An index controller class to balance the performance degradation of 
frequent disk I/O. */
@@ -44,16 +48,23 @@ public class IndexController {
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final String storageDir;
+
+  private final Peer peer;
   private final String prefix;
   private final long initialIndex;
 
   private final long checkpointGap;
 
-  public IndexController(String storageDir, String prefix, long initialIndex, 
long checkpointGap) {
+  public IndexController(String storageDir, Peer peer, long initialIndex, long 
checkpointGap) {
     this.storageDir = storageDir;
-    this.prefix = prefix + '-';
+    this.peer = peer;
+    this.prefix = peer.getNodeId() + "-";
     this.checkpointGap = checkpointGap;
     this.initialIndex = initialIndex;
+    // This is because we changed the name of the version file in version 
1.0.1. In order to ensure
+    // compatibility with version 1.0.0, we need to add this function. We will 
remove this function
+    // in the future version 2.x.
+    upgrade();
     restore();
   }
 
@@ -124,6 +135,29 @@ public class IndexController {
     }
   }
 
+  private void upgrade() {
+    File directory = new File(storageDir);
+    String oldPrefix = Utils.fromTEndPointToString(peer.getEndpoint()) + "-";
+    Optional.ofNullable(directory.listFiles((dir, name) -> 
name.startsWith(oldPrefix)))
+        .ifPresent(
+            files ->
+                Arrays.stream(files)
+                    .forEach(
+                        oldFile -> {
+                          long fileVersion = 
Long.parseLong(oldFile.getName().split("-")[1]);
+                          File newFile = new File(storageDir, prefix + 
fileVersion);
+                          try {
+                            logger.info(
+                                "version file upgrade, previous: {}, current: 
{}",
+                                oldFile.getAbsolutePath(),
+                                newFile.getAbsolutePath());
+                            FileUtils.moveFile(oldFile, newFile);
+                          } catch (IOException e) {
+                            logger.error("Error occurred when upgrading 
version file", e);
+                          }
+                        }));
+  }
+
   private void restore() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> 
name.startsWith(prefix));
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c6eb05aa0a..c89e9f50fb 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
-import org.apache.iotdb.consensus.ratis.Utils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.thrift.TException;
@@ -204,7 +203,7 @@ public class LogDispatcher {
       this.controller =
           new IndexController(
               impl.getStorageDir(),
-              Utils.fromTEndPointToString(peer.getEndpoint()),
+              peer,
               initialSyncIndex,
               config.getReplication().getCheckpointGap());
       this.syncStatus = new SyncStatus(controller, config);
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 183c35e2ee..f3e8a40e18 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
+
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -27,11 +32,14 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 
 public class IndexControllerTest {
 
   private static final File storageDir = new File("target" + 
java.io.File.separator + "test");
-  private static final String prefix = "version";
+
+  private static final Peer peer =
+      new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
 
   private static final long CHECK_POINT_GAP = 500;
 
@@ -45,11 +53,10 @@ public class IndexControllerTest {
     FileUtils.deleteFully(storageDir);
   }
 
-  /** test indexController when incrementIntervalAfterRestart == false */
   @Test
-  public void testIncrementIntervalAfterRestart() {
+  public void testRestart() {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -58,7 +65,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -66,7 +73,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -74,7 +81,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -82,4 +89,18 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
   }
+
+  @Test
+  public void testUpgrade() throws IOException {
+    File oldFile =
+        new File(storageDir, Utils.fromTEndPointToString(peer.getEndpoint()) + 
"-" + 100);
+    Files.createFile(oldFile.toPath());
+
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
+    Assert.assertEquals(100, controller.getCurrentIndex());
+
+    File newFile = new File(storageDir, peer.getNodeId() + "-" + 100);
+    Assert.assertTrue(newFile.exists());
+  }
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index b0a85612f7..f9c78622db 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
 
@@ -38,7 +41,9 @@ import java.util.concurrent.ExecutionException;
 public class SyncStatusTest {
 
   private static final File storageDir = new File("target" + 
java.io.File.separator + "test");
-  private static final String prefix = "version";
+
+  private static final Peer peer =
+      new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
   private static final IoTConsensusConfig config = new 
IoTConsensusConfig.Builder().build();
   private static final long CHECK_POINT_GAP = 500;
 
@@ -56,7 +61,7 @@ public class SyncStatusTest {
   @Test
   public void sequenceTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -86,7 +91,7 @@ public class SyncStatusTest {
   @Test
   public void reverseTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -123,7 +128,7 @@ public class SyncStatusTest {
   @Test
   public void mixedTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -172,7 +177,7 @@ public class SyncStatusTest {
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, 
CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);

Reply via email to