Repository: carbondata Updated Branches: refs/heads/branch-1.3 0299dd90a -> 6eb647144
[CARBONDATA-2230][BACKPORT-1.3]Add a path into table path to store lock files and delete useless segment lock files before loading After PR1984[https://github.com/apache/carbondata/pull/1984] merged, it doesn't delete the lock files when unlock, there are many useless lock files in table path, especially segment lock files, they grow after every batch loading. Solution : add a child path into table path, called Locks, all lock files will be stored in this path; Before loading, get all useless segment lock files and delete them, because just segment lock files will grow, other lock files dosen't grow. This closes #2076 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6eb64714 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6eb64714 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6eb64714 Branch: refs/heads/branch-1.3 Commit: 6eb6471440aefc8d41bf8fde787c3547f7fd6276 Parents: 0299dd9 Author: Zhang Zhichao <441586...@qq.com> Authored: Thu Mar 8 15:18:09 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Mar 22 21:02:27 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 11 ++++++ .../core/datastore/impl/FileFactory.java | 12 +++++-- .../carbondata/core/locks/CarbonLockUtil.java | 34 +++++++++++++++++++ .../carbondata/core/locks/HdfsFileLock.java | 35 ++++++++++++-------- .../carbondata/core/locks/LocalFileLock.java | 34 ++++++++----------- .../carbondata/core/locks/ZooKeeperLocking.java | 7 ++-- .../carbondata/core/util/CarbonProperties.java | 20 +++++++++++ .../core/util/path/CarbonTablePath.java | 32 +++++++++++++++++- .../carbondata/spark/util/DataLoadingUtil.scala | 2 ++ .../org/apache/spark/util/AlterTableUtil.scala | 4 +-- 10 files changed, 149 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index aa93db8..f210408 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1570,6 +1570,17 @@ public final class CarbonCommonConstants { public static final String CARBON_SHOW_DATAMAPS_DEFAULT = "true"; + /** + * Currently the segment lock files are not deleted immediately when unlock, + * this value indicates the number of hours the segment lock files will be preserved. + */ + @CarbonProperty + public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS = + "carbon.segment.lock.files.preserve.hours"; + + // default value is 2 days + public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT = "48"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 38ed2b7..cbbf9b6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -177,8 +177,7 @@ public final class FileFactory { } /** - * This method checks the given path exists or not and also is it file or - * not if the performFileCheck is true + * This method checks the given path exists or not. * * @param filePath - Path * @param fileType - FileType Local/HDFS @@ -187,6 +186,15 @@ public final class FileFactory { return getCarbonFile(filePath).isFileExist(filePath, fileType); } + /** + * This method checks the given path exists or not. + * + * @param filePath - Path + */ + public static boolean isFileExist(String filePath) throws IOException { + return isFileExist(filePath, getFileType(filePath)); + } + public static boolean createNewFile(String filePath, FileType fileType) throws IOException { return createNewFile(filePath, fileType, true, null); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java index c399ef4..5ac2bc9 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java @@ -19,8 +19,13 @@ package org.apache.carbondata.core.locks; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonTablePath; /** * This class contains all carbon lock utilities @@ -107,4 +112,33 @@ public class CarbonLockUtil { } } + /** + * Currently the segment lock files are not deleted immediately when unlock, + * so it needs to delete expired lock files before delete loads. + */ + public static void deleteExpiredSegmentLockFiles(CarbonTable carbonTable) { + final long currTime = System.currentTimeMillis(); + final long segmentLockFilesPreservTime = + CarbonProperties.getInstance().getSegmentLockFilesPreserveHours(); + AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier(); + String lockFilesDir = CarbonTablePath + .getLockFilesDirPath(absoluteTableIdentifier.getTablePath()); + CarbonFile[] files = FileFactory.getCarbonFile(lockFilesDir) + .listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile pathName) { + if (CarbonTablePath.isSegmentLockFilePath(pathName.getName())) { + if ((currTime - pathName.getLastModifiedTime()) > segmentLockFilesPreservTime) { + return true; + } + } + return false; + } + } + ); + + for (CarbonFile file : files) { + file.delete(); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java index be98f7d..3c28f9d 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java @@ -22,35 +22,38 @@ import java.io.IOException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.util.path.CarbonTablePath; /** * This class is used to handle the HDFS File locking. - * This is acheived using the concept of acquiring the data out stream using Append option. + * This is achieved using the concept of acquiring the data out stream using Append option. */ public class HdfsFileLock extends AbstractCarbonLock { private static final LogService LOGGER = LogServiceFactory.getLogService(HdfsFileLock.class.getName()); /** - * location hdfs file location + * lockFilePath is the location of the lock file. */ - private String location; + private String lockFilePath; - private DataOutputStream dataOutputStream; + /** + * lockFileDir is the directory of the lock file. + */ + private String lockFileDir; - private static String tmpPath; + private DataOutputStream dataOutputStream; /** * @param lockFileLocation * @param lockFile */ public HdfsFileLock(String lockFileLocation, String lockFile) { - this.location = lockFileLocation - + CarbonCommonConstants.FILE_SEPARATOR + lockFile; - LOGGER.info("HDFS lock path:" + this.location); + this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation); + this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile); + LOGGER.info("HDFS lock path:" + this.lockFilePath); initRetry(); } @@ -58,7 +61,7 @@ public class HdfsFileLock extends AbstractCarbonLock { * @param lockFilePath */ public HdfsFileLock(String lockFilePath) { - this.location = lockFilePath; + this.lockFilePath = lockFilePath; initRetry(); } @@ -75,11 +78,15 @@ public class HdfsFileLock extends AbstractCarbonLock { */ @Override public boolean lock() { try { - if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { - FileFactory.createNewLockFile(location, FileFactory.getFileType(location)); + if (null != this.lockFileDir && + !FileFactory.isFileExist(lockFileDir)) { + FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir)); + } + if (!FileFactory.isFileExist(lockFilePath)) { + FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath)); } - dataOutputStream = - FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location)); + dataOutputStream = FileFactory.getDataOutputStreamUsingAppend(lockFilePath, + FileFactory.getFileType(lockFilePath)); return true; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java index 75ea074..a40f8de 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java @@ -25,9 +25,9 @@ import java.nio.channels.OverlappingFileLockException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.util.path.CarbonTablePath; /** * This class handles the file locking in the local file system. @@ -35,9 +35,14 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; */ public class LocalFileLock extends AbstractCarbonLock { /** - * location is the location of the lock file. + * lockFilePath is the location of the lock file. */ - private String location; + private String lockFilePath; + + /** + * lockFileDir is the directory of the lock file. + */ + private String lockFileDir; /** * fileOutputStream of the local lock file @@ -55,27 +60,18 @@ public class LocalFileLock extends AbstractCarbonLock { private FileLock fileLock; /** - * lock file - */ - private String lockFile; - - private String lockFilePath; - - /** * LOGGER for logging the messages. */ private static final LogService LOGGER = LogServiceFactory.getLogService(LocalFileLock.class.getName()); - - /** * @param lockFileLocation * @param lockFile */ public LocalFileLock(String lockFileLocation, String lockFile) { - this.location = lockFileLocation; - this.lockFile = lockFile; + this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation); + this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile); initRetry(); } @@ -95,13 +91,11 @@ public class LocalFileLock extends AbstractCarbonLock { */ @Override public boolean lock() { try { - if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { - FileFactory.mkdirs(location, FileFactory.getFileType(location)); + if (!FileFactory.isFileExist(lockFileDir)) { + FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir)); } - lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR + - lockFile; - if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) { - FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location)); + if (!FileFactory.isFileExist(lockFilePath)) { + FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath)); } fileOutputStream = new FileOutputStream(lockFilePath); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java index 1de5004..5a055ab 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java @@ -25,6 +25,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -88,12 +89,12 @@ public class ZooKeeperLocking extends AbstractCarbonLock { */ public ZooKeeperLocking(String lockLocation, String lockFile) { this.lockName = lockFile; - this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation; + this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + + CarbonTablePath.getLockFilesDirPath(lockLocation); initialize(); - this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation - + CarbonCommonConstants.FILE_SEPARATOR + lockFile; + this.lockTypeFolder = tableIdFolder + CarbonCommonConstants.FILE_SEPARATOR + lockFile; try { createBaseNode(); // if exists returns null then path doesnt exist. so creating. http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 667c45c..a2ace69 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1370,4 +1370,24 @@ public final class CarbonProperties { return thresholdSize; } + /** + * Get the number of hours the segment lock files will be preserved. + * It will be converted to microseconds to return. + */ + public long getSegmentLockFilesPreserveHours() { + long preserveSeconds; + try { + int preserveHours = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS, + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT)); + preserveSeconds = preserveHours * 3600 * 1000L; + } catch (NumberFormatException exc) { + LOGGER.error( + "The segment lock files preserv hours is invalid. Using the default value " + + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT); + preserveSeconds = Integer.parseInt( + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT) * 3600 * 1000L; + } + return preserveSeconds; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index cb264c4..5584e45 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -22,6 +22,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.locks.LockUsage; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; @@ -44,6 +45,7 @@ public class CarbonTablePath extends Path { private static final String PARTITION_PREFIX = "Part"; private static final String DATA_PART_PREFIX = "part-"; private static final String BATCH_PREFIX = "_batchno"; + private static final String LOCK_DIR = "LockFiles"; public static final String CARBON_DATA_EXT = ".carbondata"; public static final String INDEX_FILE_EXT = ".carbonindex"; @@ -763,4 +765,32 @@ public class CarbonTablePath extends Path { return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments"; } -} \ No newline at end of file + /** + * Get the lock files directory + */ + public static String getLockFilesDirPath(String tablePath) { + return tablePath + CarbonCommonConstants.FILE_SEPARATOR + LOCK_DIR; + } + + /** + * Get the lock file + */ + public static String getLockFilePath(String tablePath, String lockType) { + return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + lockType; + } + + /** + * Get the segment lock file according to table path and segment load name + */ + public static String getSegmentLockFilePath(String tablePath, String loadName) { + return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + addSegmentPrefix(loadName) + LockUsage.LOCK; + } + + /** + * return true if this lock file is a segment lock file otherwise false. + */ + public static boolean isSegmentLockFilePath(String lockFileName) { + return lockFileName.startsWith(SEGMENT_PREFIX) && lockFileName.endsWith(LockUsage.LOCK); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 6767ef7..a60e593 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -437,6 +437,8 @@ object DataLoadingUtil { } } } + // delete the expired segment lock files + CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable) } private def isUpdationRequired(isForceDeletion: Boolean, http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 8ebd5a9..c59b1a3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -112,8 +113,7 @@ object AlterTableUtil { tablePath: String): Unit = { val lockLocation = tablePath locks.zip(locksAcquired).foreach { case (carbonLock, lockType) => - val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR + - lockType + val lockFilePath = CarbonTablePath.getLockFilePath(lockLocation, lockType) if (carbonLock.releaseLockManually(lockFilePath)) { LOGGER.info(s"Alter table lock released successfully: ${ lockType }") } else {