This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 35d70733c7bc33c53f91074dd7e27cc5013cf441 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed May 24 21:43:06 2023 +0800 refactor TsFile status with migration --- .../execute/task/AbstractCompactionTask.java | 2 +- .../impl/SizeTieredCompactionSelector.java | 4 +--- .../utils/CrossSpaceCompactionCandidate.java | 11 ++--------- .../db/engine/migration/LocalMigrationTask.java | 3 +++ .../iotdb/db/engine/migration/MigrationTask.java | 13 ++++++------- .../db/engine/migration/MigrationTaskManager.java | 22 +--------------------- .../db/engine/migration/RemoteMigrationTask.java | 3 +++ .../db/engine/storagegroup/TsFileResource.java | 21 ++++++++++----------- .../engine/storagegroup/TsFileResourceStatus.java | 2 ++ 9 files changed, 29 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java index 8c91acbf556..3246a133866 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java @@ -23,9 +23,9 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java index c3dddf47402..805abe2f1d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -114,9 +114,7 @@ public class SizeTieredCompactionSelector selectedFileSize = 0L; continue; } - if (currentFile.getStatus() != TsFileResourceStatus.NORMAL - || currentFile.onRemote() - || currentFile.isMigrating()) { + if (currentFile.getStatus() != TsFileResourceStatus.NORMAL) { selectedFileList.clear(); selectedFileSize = 0L; continue; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java index b70cf0f0b0b..26d0ee0b1ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java @@ -142,10 +142,7 @@ public class CrossSpaceCompactionCandidate { private List<TsFileResourceCandidate> filterUnseqResource(List<TsFileResource> unseqResources) { List<TsFileResourceCandidate> ret = new ArrayList<>(); for (TsFileResource resource : unseqResources) { - if (resource.getStatus() != TsFileResourceStatus.NORMAL - || resource.onRemote() - || resource.isMigrating() - || !resource.getTsFile().exists()) { + if (resource.getStatus() != TsFileResourceStatus.NORMAL) { break; } else if (resource.stillLives(ttlLowerBound)) { ret.add(new TsFileResourceCandidate(resource)); @@ -200,11 +197,7 @@ public class CrossSpaceCompactionCandidate { this.selected = false; // although we do the judgement here, the task should be validated before executing because // the status of file may be changed after the task is submitted to queue - this.isValidCandidate = - tsFileResource.getStatus() == TsFileResourceStatus.NORMAL - && !tsFileResource.onRemote() - && !tsFileResource.isMigrating() - && tsFileResource.getTsFile().exists(); + this.isValidCandidate = tsFileResource.getStatus() == TsFileResourceStatus.NORMAL; } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java index 93fcd99f82c..99b0a78c8d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.migration; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,8 @@ public class LocalMigrationTask extends MigrationTask { fsFactory.copyFile(srcModsFile, destModsFile); } tsFileResource.setFile(destTsFile); + tsFileResource.increaseTierLevel(); + tsFileResource.setStatus(TsFileResourceStatus.NORMAL); } catch (Exception e) { logger.error("Fail to copy mods file from local {} to local {}", srcModsFile, destModsFile); destTsFile.delete(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java index a894334a84c..e84a5074803 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java @@ -76,13 +76,12 @@ public abstract class MigrationTask implements Runnable { @Override public void run() { - migrate(); - tsFileResource.increaseTierLevel(); - tsFileResource.setIsMigrating(false); - } - - protected boolean canMigrate() { - return tsFileResource.getStatus() == TsFileResourceStatus.NORMAL; + try { + migrate(); + } finally { + // try to set the final status to NORMAL to avoid migrate failure + tsFileResource.setStatus(TsFileResourceStatus.NORMAL); + } } public abstract void migrate(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java index ede3ecce410..53ea1a5ad4f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java @@ -144,7 +144,7 @@ public class MigrationTaskManager implements IService { private void submitMigrationTask( int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) throws IOException { - if (!checkAndMarkMigrate(sourceTsFile)) { + if (!sourceTsFile.setStatus(TsFileResourceStatus.MIGRATING)) { return; } MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir); @@ -158,26 +158,6 @@ public class MigrationTaskManager implements IService { } } - private boolean checkAndMarkMigrate(TsFileResource tsFile) { - if (canMigrate(tsFile)) { - tsFile.setIsMigrating(true); - if (occupiedByCompaction(tsFile)) { - tsFile.setIsMigrating(false); - return false; - } - return true; - } - return false; - } - - private boolean canMigrate(TsFileResource tsFile) { - return tsFile.getStatus() == TsFileResourceStatus.NORMAL && !tsFile.isMigrating(); - } - - private boolean occupiedByCompaction(TsFileResource tsFile) { - return tsFile.getStatus() != TsFileResourceStatus.NORMAL; - } - private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) { // old time partitions first int res = Long.compare(f1.getTimePartition(), f2.getTimePartition()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java index fa4bbe51cd6..ee209234019 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.migration; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,8 @@ public class RemoteMigrationTask extends MigrationTask { tsFileResource.writeLock(); try { srcFile.delete(); + tsFileResource.increaseTierLevel(); + tsFileResource.setStatus(TsFileResourceStatus.NORMAL_ON_REMOTE); } catch (Exception e) { logger.error("Fail to delete local TsFile {}", srcFile); } finally { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 1e627629c26..4dc2e1bede7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -135,8 +135,6 @@ public class TsFileResource { private volatile int tierLevel = 0; - private volatile boolean isMigrating = false; - private volatile long tsFileSize = -1L; private TsFileProcessor processor; @@ -190,6 +188,9 @@ public class TsFileResource { // This method is invoked when DataNode recovers, so the tierLevel should be calculated when // restarting this.tierLevel = TierManager.getInstance().getFileTierLevel(file); + if (onRemote()) { + this.setAtomicStatus(TsFileResourceStatus.NORMAL_ON_REMOTE); + } } /** Used for compaction to create target files. */ @@ -652,16 +653,8 @@ public class TsFileResource { return getStatus() == TsFileResourceStatus.COMPACTION_CANDIDATE; } - public boolean isMigrating() { - return isMigrating; - } - - public void setIsMigrating(boolean isMigrating) { - this.isMigrating = isMigrating; - } - public boolean onRemote() { - return !file.exists(); + return !isDeleted() && !file.exists(); } private boolean compareAndSetStatus( @@ -682,9 +675,13 @@ public class TsFileResource { switch (status) { case NORMAL: return compareAndSetStatus(TsFileResourceStatus.UNCLOSED, TsFileResourceStatus.NORMAL) + || compareAndSetStatus(TsFileResourceStatus.MIGRATING, TsFileResourceStatus.NORMAL) || compareAndSetStatus(TsFileResourceStatus.COMPACTING, TsFileResourceStatus.NORMAL) || compareAndSetStatus( TsFileResourceStatus.COMPACTION_CANDIDATE, TsFileResourceStatus.NORMAL); + case NORMAL_ON_REMOTE: + return compareAndSetStatus( + TsFileResourceStatus.MIGRATING, TsFileResourceStatus.NORMAL_ON_REMOTE); case UNCLOSED: // TsFile cannot be set back to UNCLOSED so false is always returned return false; @@ -698,6 +695,8 @@ public class TsFileResource { case COMPACTION_CANDIDATE: return compareAndSetStatus( TsFileResourceStatus.NORMAL, TsFileResourceStatus.COMPACTION_CANDIDATE); + case MIGRATING: + return compareAndSetStatus(TsFileResourceStatus.NORMAL, TsFileResourceStatus.COMPACTING); default: return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java index 4c1c95ca7e6..f389492553e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java @@ -22,7 +22,9 @@ public enum TsFileResourceStatus { UNCLOSED, /** The resource in status NORMAL, COMPACTION_CANDIDATE, COMPACTING, DELETED is all CLOSED. */ NORMAL, + NORMAL_ON_REMOTE, COMPACTION_CANDIDATE, COMPACTING, + MIGRATING, DELETED }
