This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch compaction_worker_refactor_0928
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 28ab2d75825cc990aab8322a194e1a4e947f0cba
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu Sep 28 15:11:02 2023 +0800

    tmp save
---
 .../FileCannotTransitToCompactingException.java    | 49 ++++++++++++++++++++++
 .../execute/task/AbstractCompactionTask.java       | 15 +++++++
 .../execute/task/CrossSpaceCompactionTask.java     | 25 +++++++++++
 .../execute/task/InnerSpaceCompactionTask.java     | 23 ++++++++++
 .../compaction/schedule/CompactionWorker.java      | 43 +++++++++++++++----
 .../db/storageengine/rescon/memory/SystemInfo.java |  8 ++--
 6 files changed, 153 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/FileCannotTransitToCompactingException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/FileCannotTransitToCompactingException.java
new file mode 100644
index 00000000000..d14ae4a44db
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/FileCannotTransitToCompactingException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+public class FileCannotTransitToCompactingException extends Exception {
+
+  public FileCannotTransitToCompactingException(TsFileResource f) {
+    super(
+        String.format("TsFile %s cannot transit to COMPACTING. its status: 
%s", f, f.getStatus()));
+  }
+
+  public FileCannotTransitToCompactingException() {}
+
+  public FileCannotTransitToCompactingException(String message) {
+    super(message);
+  }
+
+  public FileCannotTransitToCompactingException(String message, Throwable 
cause) {
+    super(message, cause);
+  }
+
+  public FileCannotTransitToCompactingException(Throwable cause) {
+    super(cause);
+  }
+
+  public FileCannotTransitToCompactingException(
+      String message, Throwable cause, boolean enableSuppression, boolean 
writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 40504650338..245961b382e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
 
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
@@ -27,6 +28,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -150,6 +155,16 @@ public abstract class AbstractCompactionTask {
    */
   public abstract boolean checkValidAndSetMerging();
 
+  public abstract void transitSourceFilesToMerging() throws 
FileCannotTransitToCompactingException;
+
+  public abstract long getEstimatedMemoryCost() throws IOException;
+
+  public abstract int getProcessedFileNum();
+
+  public boolean isCompactionAllowed() {
+    return tsFileManager.isAllowCompaction();
+  }
+
   @Override
   public int hashCode() {
     return super.hashCode();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 99236e11962..15cb7d1f05c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
@@ -400,6 +401,30 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
     return addReadLockSuccess;
   }
 
+  @Override
+  public void transitSourceFilesToMerging() throws 
FileCannotTransitToCompactingException {
+    for (TsFileResource f : selectedSequenceFiles) {
+      if (!f.setStatus(TsFileResourceStatus.COMPACTING)) {
+        throw new FileCannotTransitToCompactingException(f);
+      }
+    }
+    for (TsFileResource f : selectedUnsequenceFiles) {
+      if (!f.setStatus(TsFileResourceStatus.COMPACTING)) {
+        throw new FileCannotTransitToCompactingException(f);
+      }
+    }
+  }
+
+  @Override
+  public long getEstimatedMemoryCost() {
+    return memoryCost;
+  }
+
+  @Override
+  public int getProcessedFileNum() {
+    return selectedSequenceFiles.size() + selectedUnsequenceFiles.size();
+  }
+
   private boolean addReadLock(List<TsFileResource> tsFileResourceList) {
     try {
       for (TsFileResource tsFileResource : tsFileResourceList) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 6a22b9fa606..809e618d989 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
@@ -520,6 +521,28 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     return true;
   }
 
+  @Override
+  public void transitSourceFilesToMerging() throws 
FileCannotTransitToCompactingException {
+    for (TsFileResource f : selectedTsFileResourceList) {
+      if (!f.setStatus(TsFileResourceStatus.COMPACTING)) {
+        throw new FileCannotTransitToCompactingException(f);
+      }
+    }
+  }
+
+  @Override
+  public long getEstimatedMemoryCost() throws IOException {
+    if (memoryCost == 0L) {
+      memoryCost = 
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
+    }
+    return memoryCost;
+  }
+
+  @Override
+  public int getProcessedFileNum() {
+    return selectedTsFileResourceList.size();
+  }
+
   @Override
   protected void createSummary() {
     if (performer instanceof FastCompactionPerformer) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index f5bb18b5b54..64be3cdd09f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -19,8 +19,12 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
 
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
 import org.slf4j.Logger;
@@ -28,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.validation.constraints.NotNull;
 
+import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -55,15 +60,39 @@ public class CompactionWorker implements Runnable {
         log.warn("CompactionThread-{} terminates because interruption", 
threadId);
         return;
       }
+      if (task == null || !task.isCompactionAllowed()) {
+        log.info("Compaction task is not allowed to be executed by 
TsFileManager. Task {}", task);
+        return;
+      }
+      long estimatedMemoryCost = 0L;
+      boolean memoryAcquired = false;
+      boolean fileHandleAcquired = false;
       try {
-        if (task != null && task.checkValidAndSetMerging()) {
-          CompactionTaskSummary summary = task.getSummary();
-          CompactionTaskFuture future = new CompactionTaskFuture(summary);
-          CompactionTaskManager.getInstance().recordTask(task, future);
-          task.start();
+        task.transitSourceFilesToMerging();
+        estimatedMemoryCost = task.getEstimatedMemoryCost();
+        memoryAcquired = 
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+        fileHandleAcquired =
+            
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
+        CompactionTaskSummary summary = task.getSummary();
+        CompactionTaskFuture future = new CompactionTaskFuture(summary);
+        CompactionTaskManager.getInstance().recordTask(task, future);
+        task.start();
+      } catch (FileCannotTransitToCompactingException
+          | IOException
+          | CompactionMemoryNotEnoughException
+          | CompactionFileCountExceededException e) {
+        log.info("CompactionTask {} cannot be executed. Reason: {}", task, e);
+      } catch (InterruptedException e) {
+        log.warn("InterruptedException occurred when preparing compaction 
task. {}", task, e);
+        Thread.currentThread().interrupt();
+      } finally {
+        task.resetCompactionCandidateStatusForAllSourceFiles();
+        if (memoryAcquired) {
+          
SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost);
+        }
+        if (fileHandleAcquired) {
+          
SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum());
         }
-      } catch (Exception e) {
-        log.error("CompactionWorker.run(), Exception.", e);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 50cbe144f61..334575f26e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -188,7 +188,7 @@ public class SystemInfo {
     this.flushingMemTablesCost -= flushingMemTableCost;
   }
 
-  public void addCompactionFileNum(int fileNum, long timeOutInSecond)
+  public boolean addCompactionFileNum(int fileNum, long timeOutInSecond)
       throws InterruptedException, CompactionFileCountExceededException {
     if (fileNum > totalFileLimitForCrossTask) {
       // source file num is greater than the max file num for compaction
@@ -210,12 +210,13 @@ public class SystemInfo {
       Thread.sleep(100);
       originFileNum = this.compactionFileNumCost.get();
     }
+    return true;
   }
 
-  public void addCompactionMemoryCost(long memoryCost, long timeOutInSecond)
+  public boolean addCompactionMemoryCost(long memoryCost, long timeOutInSecond)
       throws InterruptedException, CompactionMemoryNotEnoughException {
     if (!config.isEnableCompactionMemControl()) {
-      return;
+      return false;
     }
     if (memoryCost > memorySizeForCompaction) {
       // required memory cost is greater than the total memory budget for 
compaction
@@ -239,6 +240,7 @@ public class SystemInfo {
       Thread.sleep(100);
       originSize = this.compactionMemoryCost.get();
     }
+    return true;
   }
 
   public synchronized void resetCompactionMemoryCost(long 
compactionMemoryCost) {

Reply via email to