This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dynamic_compaction by this 
push:
     new 59217c5  fix full merge in dynamic compaction
59217c5 is described below

commit 59217c5320c0d399292c02897222a9d8d87245ab
Author: EJTTianyu <[email protected]>
AuthorDate: Fri May 21 11:46:41 2021 +0800

    fix full merge in dynamic compaction
---
 .../HitterLevelCompactionTsFileManagement.java     |  6 ++--
 .../engine/heavyhitter/hitter/HashMapHitter.java   | 34 +++++++++++++++-------
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 6292903..2761feb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -227,7 +227,7 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
           }
         }
       }
-      List<TsFileResource> fullMergeRes = new 
ArrayList<>(mergeResources.get(seqLevelNum - 2));
+      List<TsFileResource> fullMergeRes = new 
ArrayList<>(mergeResources.get(seqLevelNum - 1));
       FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes, 
timePartition);
       new Thread(fullMergeTask).start();
     } catch (Exception e) {
@@ -463,8 +463,8 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
                 new HashSet<>(), true);
         writeLock();
         try {
-          sequenceTsFileResources.get(timePartitionId).get(seqLevelNum - 
1).add(newResource);
-          deleteLevelFilesInList(timePartitionId, mergeFileLst, seqLevelNum - 
2, true);
+          
sequenceTsFileResources.get(timePartitionId).get(seqLevelNum).add(newResource);
+          deleteLevelFilesInList(timePartitionId, mergeFileLst, seqLevelNum - 
1, true);
         } finally {
           writeUnlock();
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
index cfeeae8..8f98118 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.PriorityQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -42,6 +44,7 @@ import org.slf4j.LoggerFactory;
 public class HashMapHitter implements QueryHeavyHitters {
 
   private static final Logger logger = 
LoggerFactory.getLogger(HashMapHitter.class);
+  private final ReadWriteLock hitterLock = new ReentrantReadWriteLock();
   int hitter = IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum();
   private Map<PartialPath, Integer> counter = new HashMap<>();
   private PriorityQueue<Entry<PartialPath, Integer>> topHeap = new 
PriorityQueue<>(hitter,
@@ -58,23 +61,34 @@ public class HashMapHitter implements QueryHeavyHitters {
 
   @Override
   public void acceptQuerySeries(PartialPath queryPath) {
-    counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
+    hitterLock.writeLock().lock();
+    try {
+      counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
+    } finally {
+      hitterLock.writeLock().unlock();
+    }
   }
 
   @Override
   public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws 
MetadataException {
-    List<PartialPath> ret = new ArrayList<>();
-    topHeap.addAll(counter.entrySet());
-    List<PartialPath> sgPaths = 
MManager.getInstance().getAllTimeseriesPath(sgName);
-    for (int k = 0; k < hitter; k++) {
-      if (!topHeap.isEmpty()) {
-        PartialPath path =  topHeap.poll().getKey();
-        if (sgPaths.contains(path)) {
-          ret.add(path);
+    hitterLock.readLock().lock();
+    try {
+      List<PartialPath> ret = new ArrayList<>();
+      topHeap.addAll(counter.entrySet());
+      List<PartialPath> sgPaths = 
MManager.getInstance().getAllTimeseriesPath(sgName);
+      for (int k = 0; k < hitter; k++) {
+        if (!topHeap.isEmpty()) {
+          PartialPath path = topHeap.poll().getKey();
+          if (sgPaths.contains(path)) {
+            ret.add(path);
+          }
         }
       }
+      topHeap.clear();
+      return ret;
+    } finally {
+      hitterLock.readLock().unlock();
     }
-    return ret;
   }
 
   /**

Reply via email to