This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_recover_logger_1017
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/compaction_recover_logger_1017
by this push:
new 90ad08d8d57 move some common method to abstract class
90ad08d8d57 is described below
commit 90ad08d8d5701208f1d1c7cc1845263c2ba477fc
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Oct 17 17:49:23 2023 +0800
move some common method to abstract class
---
.../execute/task/AbstractCompactionTask.java | 107 ++++++++++++++++++++
.../execute/task/InnerSpaceCompactionTask.java | 110 +--------------------
2 files changed, 110 insertions(+), 107 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 7d9512780cd..8e378b884c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -19,17 +19,26 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -41,6 +50,9 @@ import java.util.List;
* finished. The future returns the {@link CompactionTaskSummary} of this task
execution.
*/
public abstract class AbstractCompactionTask {
+ protected static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+
protected String dataRegionId;
protected String storageGroupName;
protected long timePartition;
@@ -110,6 +122,8 @@ public abstract class AbstractCompactionTask {
protected abstract boolean doCompaction();
+ protected abstract void recover();
+
protected void printLogWhenException(Logger logger, Exception e) {
if (e instanceof InterruptedException) {
logger.warn("{}-{} [Compaction] Compaction interrupted",
storageGroupName, dataRegionId);
@@ -198,6 +212,99 @@ public abstract class AbstractCompactionTask {
}
}
+ protected boolean checkAllSourceFileExists(List<TsFileResource>
tsFileResources) {
+ for (TsFileResource tsFileResource : tsFileResources) {
+ if (!tsFileResource.getTsFile().exists() ||
!tsFileResource.resourceFileExists()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected void handleRecoverException(Exception e) {
+ LOGGER.error(
+ "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {},
Exception: {}",
+ dataRegionId,
+ this,
+ e);
+ tsFileManager.setAllowCompaction(false);
+ LOGGER.error("stop compaction because of exception during recovering");
+ // 考虑是否需要将系统设置为 READONLY
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
+ }
+
+ protected void insertFilesToTsFileManager(List<TsFileResource> tsFiles)
throws IOException {
+ for (TsFileResource tsFileResource : tsFiles) {
+ tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq());
+ }
+ }
+
+ protected void removeTsFileInMemory(List<TsFileResource> resourceList) {
+ tsFileManager.writeLock("CompactionExceptionHandler");
+ try {
+ for (TsFileResource targetTsFile : resourceList) {
+ if (targetTsFile == null) {
+ // target file has been deleted due to empty after compaction
+ continue;
+ }
+ tsFileManager.remove(targetTsFile, targetTsFile.isSeq());
+ }
+ } finally {
+ tsFileManager.writeUnlock();
+ }
+ }
+
+ public File getRealTargetFile(TsFileIdentifier targetFileIdentifier, String
suffix) {
+ File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+ File targetFile =
+ getFileFromDataDirs(
+ targetFileIdentifier.getFilePath().replace(suffix,
TsFileConstant.TSFILE_SUFFIX));
+ return tmpTargetFile != null ? tmpTargetFile : targetFile;
+ }
+
+ /**
+ * This method find the File object of given filePath by searching it in
every data directory. If
+ * the file is not found, it will return null.
+ */
+ public File getFileFromDataDirs(String filePath) {
+ String[] dataDirs =
IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
+ for (String dataDir : dataDirs) {
+ File f = new File(dataDir, filePath);
+ if (f.exists()) {
+ return f;
+ }
+ }
+ return null;
+ }
+
+ protected void deleteCompactionModsFile(List<TsFileResource>
tsFileResourceList)
+ throws IOException {
+ for (TsFileResource seqFile : tsFileResourceList) {
+ ModificationFile modificationFile = seqFile.getCompactionModFile();
+ if (modificationFile.exists()) {
+ modificationFile.remove();
+ }
+ }
+ }
+
+ protected boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) {
+ for (TsFileResource resource : tsFiles) {
+ if (!deleteTsFileOnDisk(resource)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected boolean deleteTsFileOnDisk(TsFileResource tsFileResource) {
+ tsFileResource.writeLock();
+ try {
+ return tsFileResource.remove();
+ } finally {
+ tsFileResource.writeUnlock();
+ }
+ }
+
public boolean isTaskRan() {
return summary.isRan();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 2dc544b4b57..47575d8ea11 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
@@ -40,19 +38,15 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.val
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -62,8 +56,6 @@ import java.util.List;
import java.util.Objects;
public class InnerSpaceCompactionTask extends AbstractCompactionTask {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected List<TsFileResource> selectedTsFileResourceList;
protected TsFileResource targetTsFileResource;
@@ -118,7 +110,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
sourceFileIdentifiers.forEach(
f -> this.selectedTsFileResourceList.add(new
TsFileResource(f.getFileFromDataDirs())));
if (targetFileIdentifiers.size() > 0) {
- File targetFileOnDisk = getRealTargetFile(targetFileIdentifiers.get(0));
+ File targetFileOnDisk =
+ getRealTargetFile(
+ targetFileIdentifiers.get(0),
IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX);
// The targetFileOnDisk may be null, but it won't impact the task
recover stage
this.targetTsFileResource = new TsFileResource(targetFileOnDisk);
}
@@ -353,39 +347,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
}
}
- protected void handleRecoverException(Exception e) {
- LOGGER.error(
- "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {},
Exception: {}",
- dataRegionId,
- this,
- e);
- tsFileManager.setAllowCompaction(false);
- LOGGER.error("stop compaction because of exception during recovering");
- // 考虑是否需要将系统设置为 READONLY
-
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
- }
-
- private void removeTsFileInMemory(List<TsFileResource> resourceList) {
- tsFileManager.writeLock("CompactionExceptionHandler");
- try {
- for (TsFileResource targetTsFile : resourceList) {
- if (targetTsFile == null) {
- // target file has been deleted due to empty after compaction
- continue;
- }
- tsFileManager.remove(targetTsFile, targetTsFile.isSeq());
- }
- } finally {
- tsFileManager.writeUnlock();
- }
- }
-
- private void insertFilesToTsFileManager(List<TsFileResource> tsFiles) throws
IOException {
- for (TsFileResource tsFileResource : tsFiles) {
- tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq());
- }
- }
-
private void rollback() throws Exception {
// if the task has started,
if (recoverMemoryStatus) {
@@ -429,75 +390,10 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
deleteCompactionModsFile(selectedTsFileResourceList);
}
- /**
- * This method find the File object of given filePath by searching it in
every data directory. If
- * the file is not found, it will return null.
- */
- public File getFileFromDataDirs(String filePath) {
- String[] dataDirs =
IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
- for (String dataDir : dataDirs) {
- File f = new File(dataDir, filePath);
- if (f.exists()) {
- return f;
- }
- }
- return null;
- }
-
- public File getRealTargetFile(TsFileIdentifier targetFileIdentifier) {
- File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
- File targetFile =
- getFileFromDataDirs(
- targetFileIdentifier
- .getFilePath()
- .replace(
- IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
TsFileConstant.TSFILE_SUFFIX));
- return tmpTargetFile != null ? tmpTargetFile : targetFile;
- }
-
- public void deleteCompactionModsFile(List<TsFileResource>
tsFileResourceList) throws IOException {
- for (TsFileResource seqFile : tsFileResourceList) {
- ModificationFile modificationFile = seqFile.getCompactionModFile();
- if (modificationFile.exists()) {
- modificationFile.remove();
- }
- }
- }
-
- private boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) {
- for (TsFileResource resource : tsFiles) {
- if (!deleteTsFileOnDisk(resource)) {
- return false;
- }
- }
- return true;
- }
-
- private boolean deleteTsFileOnDisk(TsFileResource tsFileResource) {
- tsFileResource.writeLock();
- try {
- if (!tsFileResource.remove()) {
- return false;
- }
- } finally {
- targetTsFileResource.writeUnlock();
- }
- return true;
- }
-
private boolean shouldRollback() {
return checkAllSourceFileExists(selectedTsFileResourceList);
}
- private boolean checkAllSourceFileExists(List<TsFileResource>
tsFileResources) {
- for (TsFileResource tsFileResource : tsFileResources) {
- if (!tsFileResource.getTsFile().exists() ||
!tsFileResource.resourceFileExists()) {
- return false;
- }
- }
- return true;
- }
-
@Override
public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
if (!(otherTask instanceof InnerSpaceCompactionTask)) {