This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_incorrect_version_file_position in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit bbacf1b7caad9986437fa49ca531d7e5e33622a3 Author: 江天 <[email protected]> AuthorDate: Tue Apr 9 23:03:52 2019 +0800 bug fix: version files of different storage groups are placed into the same place. --- .../db/engine/filenode/FileNodeProcessor.java | 20 +++-- .../version/SimpleFileVersionController.java | 24 ++++-- .../version/SimpleFileVersionControllerTest.java | 15 ++-- .../iotdb/db/integration/IoTDBVersionIT.java | 90 ++++++++++++++++++++++ 4 files changed, 124 insertions(+), 25 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 72348b6..6818a79 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -134,7 +134,6 @@ public class FileNodeProcessor extends Processor implements IStatistic { private FileNodeProcessorStore fileNodeProcessorStore; private String fileNodeRestoreFilePath; private final Object fileNodeRestoreLock = new Object(); - private String baseDirPath; // last merge time private long lastMergeTime = -1; private BufferWriteProcessor bufferWriteProcessor = null; @@ -242,16 +241,16 @@ public class FileNodeProcessor extends Processor implements IStatistic { && dirPath.charAt(dirPath.length() - 1) != File.separatorChar) { dirPath = dirPath + File.separatorChar; } - this.baseDirPath = dirPath + processorName; - File dataDir = new File(this.baseDirPath); - if (!dataDir.exists()) { - dataDir.mkdirs(); + + File restoreFolder = new File(dirPath + processorName); + if (!restoreFolder.exists()) { + restoreFolder.mkdirs(); LOGGER.info( - "The data directory of the filenode processor {} doesn't exist. Create new " + + "The restore directory of the filenode processor {} doesn't exist. Create new " + "directory {}", - getProcessorName(), baseDirPath); + getProcessorName(), restoreFolder.getAbsolutePath()); } - fileNodeRestoreFilePath = new File(dataDir, processorName + RESTORE_FILE_SUFFIX).getPath(); + fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX).getPath(); try { fileNodeProcessorStore = readStoreFromDisk(); } catch (FileNodeProcessorException e) { @@ -294,7 +293,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { statMonitor.registerStatistics(statStorageDeltaName, this); } try { - versionController = new SimpleFileVersionController(fileNodeDirPath); + versionController = new SimpleFileVersionController(restoreFolder.getPath()); } catch (IOException e) { throw new FileNodeProcessorException(e); } @@ -1982,7 +1981,6 @@ public class FileNodeProcessor extends Processor implements IStatistic { isMerging == that.isMerging && Objects.equals(fileNodeProcessorStore, that.fileNodeProcessorStore) && Objects.equals(fileNodeRestoreFilePath, that.fileNodeRestoreFilePath) && - Objects.equals(baseDirPath, that.baseDirPath) && Objects.equals(bufferWriteProcessor, that.bufferWriteProcessor) && Objects.equals(overflowProcessor, that.overflowProcessor) && Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) && @@ -2002,7 +2000,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed, lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles, emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging, - numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, baseDirPath, + numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet, newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters, fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction, diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java index 07d5e7d..d9bc1f6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java @@ -22,8 +22,6 @@ package org.apache.iotdb.db.engine.version; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,12 +31,13 @@ import org.slf4j.LoggerFactory; */ public class SimpleFileVersionController implements VersionController { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFileVersionController.class); + /** - * Every time currVersion - prevVersion >= SAVE_INTERVAL, currVersion is persisted and prevVersion + * Every time currVersion - prevVersion >= saveInterval, currVersion is persisted and prevVersion * is set to currVersion. When recovering from file, the version number is automatically increased - * by SAVE_INTERVAL to avoid conflicts. + * by saveInterval to avoid conflicts. */ - public static final long SAVE_INTERVAL = 100; + private static long saveInterval = 100; private static final String FILE_PREFIX = "Version-"; private long prevVersion; private long currVersion; @@ -70,7 +69,7 @@ public class SimpleFileVersionController implements VersionController { } private void checkPersist() throws IOException { - if ((currVersion - prevVersion) >= SAVE_INTERVAL) { + if ((currVersion - prevVersion) >= saveInterval) { persist(); } } @@ -79,6 +78,8 @@ public class SimpleFileVersionController implements VersionController { File oldFile = new File(directoryPath, FILE_PREFIX + prevVersion); File newFile = new File(directoryPath, FILE_PREFIX + currVersion); FileUtils.moveFile(oldFile, newFile); + LOGGER.info("Version file updated, previous: {}, current: {}", + oldFile.getAbsolutePath(), newFile.getAbsolutePath()); prevVersion = currVersion; } @@ -109,7 +110,16 @@ public class SimpleFileVersionController implements VersionController { new FileOutputStream(versionFile).close(); } // prevent overlapping in case of failure - currVersion = prevVersion + SAVE_INTERVAL; + currVersion = prevVersion + saveInterval; persist(); } + + // test only method + public static void setSaveInterval(long saveInterval) { + SimpleFileVersionController.saveInterval = saveInterval; + } + + public static long getSaveInterval() { + return saveInterval; + } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java index 0bc062d..cacde79 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java @@ -19,16 +19,15 @@ package org.apache.iotdb.db.engine.version; + +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; - import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Test; -import static org.apache.iotdb.db.engine.version.SimpleFileVersionController.SAVE_INTERVAL; -import static org.junit.Assert.assertEquals; - public class SimpleFileVersionControllerTest { @Test public void test() throws IOException { @@ -39,13 +38,15 @@ public class SimpleFileVersionControllerTest { Assert.fail("can not create version.tmp folder"); } VersionController versionController = new SimpleFileVersionController(tempFilePath); - assertEquals(SAVE_INTERVAL, versionController.currVersion()); + assertEquals(SimpleFileVersionController.getSaveInterval(), versionController.currVersion()); for (int i = 0; i < 150; i++) { versionController.nextVersion(); } - assertEquals(SAVE_INTERVAL + 150, versionController.currVersion()); + assertEquals(SimpleFileVersionController.getSaveInterval() + 150, + versionController.currVersion()); versionController = new SimpleFileVersionController(tempFilePath); - assertEquals(SAVE_INTERVAL + 200, versionController.currVersion()); + assertEquals(SimpleFileVersionController.getSaveInterval() + 200, + versionController.currVersion()); } finally { FileUtils.deleteDirectory(new File(tempFilePath)); } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBVersionIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBVersionIT.java new file mode 100644 index 0000000..16cdd98 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBVersionIT.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.integration; + +import static org.apache.iotdb.db.integration.Constant.TIMESTAMP_STR; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.iotdb.db.engine.version.SimpleFileVersionController; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.jdbc.IoTDBConnection; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class IoTDBVersionIT { + + private IoTDB deamon; + + @Before + public void setUp() throws Exception { + EnvironmentUtils.closeMemControl(); + deamon = IoTDB.getInstance(); + deamon.active(); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + deamon.stop(); + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testVersionPersist() throws SQLException, ClassNotFoundException { + Class.forName(Config.JDBC_DRIVER_NAME); + try(Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", + "root", "root")){ + Statement statement = connection.createStatement(); + + statement.execute("SET STORAGE GROUP TO root.versionTest1"); + statement.execute("SET STORAGE GROUP TO root.versionTest2"); + statement.execute("CREATE TIMESERIES root.versionTest1.s0" + + " WITH DATATYPE=INT32,ENCODING=PLAIN"); + statement.execute("CREATE TIMESERIES root.versionTest2.s0" + + " WITH DATATYPE=INT32,ENCODING=PLAIN"); + + // write and flush enough times to make the version file persist + for (int i = 0; i < 3 * SimpleFileVersionController.getSaveInterval(); i ++) { + for (int j = 1; j <= 100; j ++) { + statement.execute(String + .format("INSERT INTO root.versionTest1(timestamp, s0) VALUES (%d, %d)", i*100+j, j)); + } + statement.execute("FLUSH"); + } + for (int i = 0; i < 3 * SimpleFileVersionController.getSaveInterval(); i ++) { + for (int j = 1; j <= 100; j ++) { + statement.execute(String + .format("INSERT INTO root.versionTest2(timestamp, s0) VALUES (%d, %d)", i*100+j, j)); + } + statement.execute("FLUSH"); + } + + statement.close(); + } + } +}
