This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5174 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a73a26ef0eec35a50ee76cad0b385c9c4a8992b0 Author: Potato <[email protected]> AuthorDate: Thu Dec 15 10:55:48 2022 +0800 [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458) * change version filename && add compatibility solution/test Signed-off-by: OneSizeFitQuorum <[email protected]> * fix review Signed-off-by: OneSizeFitQuorum <[email protected]> * rename to uppercase Signed-off-by: OneSizeFitQuorum <[email protected]> Co-authored-by: Jinrui.Zhang <[email protected]> --- .../iot/logdispatcher/IndexController.java | 43 ++++++++++++++++++++-- .../consensus/iot/logdispatcher/LogDispatcher.java | 3 +- .../iot/logdispatcher/IndexControllerTest.java | 37 +++++++++++++++---- .../iot/logdispatcher/SyncStatusTest.java | 15 +++++--- 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java index e5242c6611..7630849928 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java @@ -20,6 +20,8 @@ package org.apache.iotdb.consensus.iot.logdispatcher; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.ratis.Utils; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -30,12 +32,16 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; /** An index controller class to balance the performance degradation of frequent disk I/O. */ @ThreadSafe public class IndexController { + public static final String SEPARATOR = "-"; + private final Logger logger = LoggerFactory.getLogger(IndexController.class); private long lastFlushedIndex; @@ -44,16 +50,23 @@ public class IndexController { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final String storageDir; + + private final Peer peer; private final String prefix; private final long initialIndex; private final long checkpointGap; - public IndexController(String storageDir, String prefix, long initialIndex, long checkpointGap) { + public IndexController(String storageDir, Peer peer, long initialIndex, long checkpointGap) { this.storageDir = storageDir; - this.prefix = prefix + '-'; + this.peer = peer; + this.prefix = peer.getNodeId() + SEPARATOR; this.checkpointGap = checkpointGap; this.initialIndex = initialIndex; + // This is because we changed the name of the version file in version 1.0.1. In order to ensure + // compatibility with version 1.0.0, we need to add this function. We will remove this function + // in the future version 2.x. + upgrade(); restore(); } @@ -124,6 +137,30 @@ public class IndexController { } } + private void upgrade() { + File directory = new File(storageDir); + String oldPrefix = Utils.fromTEndPointToString(peer.getEndpoint()) + SEPARATOR; + Optional.ofNullable(directory.listFiles((dir, name) -> name.startsWith(oldPrefix))) + .ifPresent( + files -> + Arrays.stream(files) + .forEach( + oldFile -> { + String[] splits = oldFile.getName().split(SEPARATOR); + long fileVersion = Long.parseLong(splits[splits.length - 1]); + File newFile = new File(storageDir, prefix + fileVersion); + try { + logger.info( + "version file upgrade, previous: {}, current: {}", + oldFile.getAbsolutePath(), + newFile.getAbsolutePath()); + FileUtils.moveFile(oldFile, newFile); + } catch (IOException e) { + logger.error("Error occurred when upgrading version file", e); + } + })); + } + private void restore() { File directory = new File(storageDir); File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix)); @@ -132,7 +169,7 @@ public class IndexController { long maxVersion = 0; int maxVersionIndex = 0; for (int i = 0; i < versionFiles.length; i++) { - long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]); + long fileVersion = Long.parseLong(versionFiles[i].getName().split(SEPARATOR)[1]); if (fileVersion > maxVersion) { maxVersion = fileVersion; maxVersionIndex = i; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index c6eb05aa0a..c89e9f50fb 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -35,7 +35,6 @@ import org.apache.iotdb.consensus.iot.thrift.TLogBatch; import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq; import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader; import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan; -import org.apache.iotdb.consensus.ratis.Utils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.thrift.TException; @@ -204,7 +203,7 @@ public class LogDispatcher { this.controller = new IndexController( impl.getStorageDir(), - Utils.fromTEndPointToString(peer.getEndpoint()), + peer, initialSyncIndex, config.getReplication().getCheckpointGap()); this.syncStatus = new SyncStatus(controller, config); diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java index 183c35e2ee..7051f47930 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java @@ -19,6 +19,11 @@ package org.apache.iotdb.consensus.iot.logdispatcher; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.ratis.Utils; + import org.apache.ratis.util.FileUtils; import org.junit.After; import org.junit.Assert; @@ -27,11 +32,14 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.nio.file.Files; public class IndexControllerTest { private static final File storageDir = new File("target" + java.io.File.separator + "test"); - private static final String prefix = "version"; + + private static final Peer peer = + new Peer(new DataRegionId(1), 2, new TEndPoint("datanode-1.datanode-svc", 6667)); private static final long CHECK_POINT_GAP = 500; @@ -45,11 +53,10 @@ public class IndexControllerTest { FileUtils.deleteFully(storageDir); } - /** test indexController when incrementIntervalAfterRestart == false */ @Test - public void testIncrementIntervalAfterRestart() { + public void testRestart() { IndexController controller = - new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(0, controller.getCurrentIndex()); Assert.assertEquals(0, controller.getLastFlushedIndex()); @@ -58,7 +65,7 @@ public class IndexControllerTest { Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex()); Assert.assertEquals(0, controller.getLastFlushedIndex()); - controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(0, controller.getCurrentIndex()); Assert.assertEquals(0, controller.getLastFlushedIndex()); @@ -66,7 +73,7 @@ public class IndexControllerTest { Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex()); Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex()); - controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex()); Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex()); @@ -74,7 +81,7 @@ public class IndexControllerTest { Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex()); Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex()); - controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex()); Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex()); @@ -82,4 +89,20 @@ public class IndexControllerTest { Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex()); Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex()); } + + @Test + public void testUpgrade() throws IOException { + File oldFile = + new File( + storageDir, + Utils.fromTEndPointToString(peer.getEndpoint()) + IndexController.SEPARATOR + 100); + Files.createFile(oldFile.toPath()); + + IndexController controller = + new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); + Assert.assertEquals(100, controller.getCurrentIndex()); + + File newFile = new File(storageDir, peer.getNodeId() + IndexController.SEPARATOR + 100); + Assert.assertTrue(newFile.exists()); + } } diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java index b0a85612f7..f9c78622db 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java @@ -19,6 +19,9 @@ package org.apache.iotdb.consensus.iot.logdispatcher; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.iot.thrift.TLogBatch; @@ -38,7 +41,9 @@ import java.util.concurrent.ExecutionException; public class SyncStatusTest { private static final File storageDir = new File("target" + java.io.File.separator + "test"); - private static final String prefix = "version"; + + private static final Peer peer = + new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667)); private static final IoTConsensusConfig config = new IoTConsensusConfig.Builder().build(); private static final long CHECK_POINT_GAP = 500; @@ -56,7 +61,7 @@ public class SyncStatusTest { @Test public void sequenceTest() throws InterruptedException { IndexController controller = - new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(0, controller.getCurrentIndex()); SyncStatus status = new SyncStatus(controller, config); @@ -86,7 +91,7 @@ public class SyncStatusTest { @Test public void reverseTest() throws InterruptedException { IndexController controller = - new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(0, controller.getCurrentIndex()); Assert.assertEquals(0, controller.getLastFlushedIndex()); @@ -123,7 +128,7 @@ public class SyncStatusTest { @Test public void mixedTest() throws InterruptedException { IndexController controller = - new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(0, controller.getCurrentIndex()); Assert.assertEquals(0, controller.getLastFlushedIndex()); @@ -172,7 +177,7 @@ public class SyncStatusTest { @Test public void waitTest() throws InterruptedException, ExecutionException { IndexController controller = - new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP); + new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP); Assert.assertEquals(0, controller.getCurrentIndex()); SyncStatus status = new SyncStatus(controller, config);
