This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d4ccd578da [IOTDB-2931] Remove access to metadata manager during 
compaction (#5572)
d4ccd578da is described below

commit d4ccd578da58505289ca921e19bbe2c53ffc4a47
Author: Liu Xuxin <[email protected]>
AuthorDate: Wed Jun 1 20:52:31 2022 +0800

    [IOTDB-2931] Remove access to metadata manager during compaction (#5572)
---
 .../engine/compaction/CompactionTaskManager.java   |   7 ++
 .../rewrite/task/ReadPointPerformerSubTask.java    |  26 ++----
 .../compaction/inner/InnerSpaceCompactionTask.java |   4 +-
 .../utils/AlignedSeriesCompactionExecutor.java     |  24 ++++-
 .../inner/utils/MultiTsFileDeviceIterator.java     |   3 +
 .../utils/SingleSeriesCompactionExecutor.java      |  34 +++++++
 .../impl/ReadChunkCompactionPerformer.java         |  18 +---
 .../impl/ReadPointCompactionPerformer.java         | 104 +++++++++++++++++----
 .../compaction/CompactionTaskManagerTest.java      |  21 ++++-
 .../compaction/inner/InnerUnseqCompactionTest.java |   2 +
 .../ReadChunkCompactionPerformerNoAlignedTest.java |   1 -
 11 files changed, 180 insertions(+), 64 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 0523fc26ff..385e23aa0d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -404,6 +404,13 @@ public class CompactionTaskManager implements IService {
                   + " seconds for all sub compaction tasks to finish.");
         }
       }
+      if (this.subCompactionTaskExecutionPool != null) {
+        subCompactionTaskExecutionPool.shutdownNow();
+        if (!this.subCompactionTaskExecutionPool.awaitTermination(
+            MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
+          throw new RuntimeException("Failed to shutdown 
subCompactionTaskExecutionPool");
+        }
+      }
       this.taskExecutionPool =
           (WrappedScheduledExecutorService)
               IoTDBThreadPoolFactory.newScheduledThreadPool(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index 780a04e01e..d22362746c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -19,24 +19,20 @@
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -53,6 +49,7 @@ public class ReadPointPerformerSubTask implements 
Callable<Void> {
   private final QueryContext queryContext;
   private final QueryDataSource queryDataSource;
   private final AbstractCompactionWriter compactionWriter;
+  private final Map<String, MeasurementSchema> schemaMap;
   private final int taskId;
 
   public ReadPointPerformerSubTask(
@@ -61,31 +58,22 @@ public class ReadPointPerformerSubTask implements 
Callable<Void> {
       QueryContext queryContext,
       QueryDataSource queryDataSource,
       AbstractCompactionWriter compactionWriter,
+      Map<String, MeasurementSchema> schemaMap,
       int taskId) {
     this.device = device;
     this.measurementList = measurementList;
     this.queryContext = queryContext;
     this.queryDataSource = queryDataSource;
     this.compactionWriter = compactionWriter;
+    this.schemaMap = schemaMap;
     this.taskId = taskId;
   }
 
   @Override
   public Void call() throws Exception {
     for (String measurement : measurementList) {
-      List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-      try {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-          
measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, 
measurement));
-        } else {
-          measurementSchemas.add(
-              IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, 
measurement)));
-        }
-      } catch (PathNotExistException e) {
-        logger.info("A deleted path is skipped: {}", e.getMessage());
-        continue;
-      }
-
+      List<IMeasurementSchema> measurementSchemas =
+          Collections.singletonList(schemaMap.get(measurement));
       IBatchReader dataBatchReader =
           ReadPointCompactionPerformer.constructReader(
               device,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index c519697162..4b2760ce5b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -202,7 +202,9 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       // catch throwable to handle OOM errors
       if (!(throwable instanceof InterruptedException)) {
         LOGGER.error(
-            "{} [Compaction] Meet errors in inner space compaction.", 
fullStorageGroupName);
+            "{} [Compaction] Meet errors in inner space compaction.",
+            fullStorageGroupName,
+            throwable);
       }
 
       // handle exception
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index af5353153f..8ee99d9e1f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -19,15 +19,19 @@
 package org.apache.iotdb.db.engine.compaction.inner.utils;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.compaction.CompactionMetricsManager;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -69,7 +73,8 @@ public class AlignedSeriesCompactionExecutor {
       String device,
       TsFileResource targetResource,
       LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter writer) {
+      TsFileIOWriter writer)
+      throws IOException {
     this.device = device;
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.writer = writer;
@@ -86,11 +91,13 @@ public class AlignedSeriesCompactionExecutor {
    * @return
    */
   private List<IMeasurementSchema> collectSchemaFromAlignedChunkMetadataList(
-      LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
-          readerAndChunkMetadataList) {
+      LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> 
readerAndChunkMetadataList)
+      throws IOException {
     Set<MeasurementSchema> schemaSet = new HashSet<>();
+    Set<String> measurementSet = new HashSet<>();
     for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair 
:
         readerAndChunkMetadataList) {
+      TsFileSequenceReader reader = readerListPair.left;
       List<AlignedChunkMetadata> alignedChunkMetadataList = 
readerListPair.right;
       for (AlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
         List<IChunkMetadata> valueChunkMetadataList =
@@ -99,9 +106,18 @@ public class AlignedSeriesCompactionExecutor {
           if (chunkMetadata == null) {
             continue;
           }
+          if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+            continue;
+          }
+          measurementSet.add(chunkMetadata.getMeasurementUid());
+          Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) 
chunkMetadata);
+          ChunkHeader header = chunk.getHeader();
           schemaSet.add(
               new MeasurementSchema(
-                  chunkMetadata.getMeasurementUid(), 
chunkMetadata.getDataType()));
+                  header.getMeasurementID(),
+                  header.getDataType(),
+                  header.getEncodingType(),
+                  header.getCompressionType()));
         }
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index be6d2c8691..73f3c2ca45 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -181,6 +181,9 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
       TsFileSequenceReader reader = readerMap.get(tsFileResource);
       List<AlignedChunkMetadata> alignedChunkMetadataList =
           reader.getAlignedChunkMetadata(currentDevice.left);
+      if (alignedChunkMetadataList.size() > 0) {
+        alignedChunkMetadataList.forEach(x -> 
x.setFilePath(tsFileResource.getTsFilePath()));
+      }
       applyModificationForAlignedChunkMetadataList(tsFileResource, 
alignedChunkMetadataList);
       readerAndChunkMetadataList.add(new Pair<>(reader, 
alignedChunkMetadataList));
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index dd1dbea024..1bba81dd04 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.engine.compaction.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -36,6 +37,7 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -47,6 +49,7 @@ import java.util.List;
 /** This class is used to compact one series during inner space compaction. */
 public class SingleSeriesCompactionExecutor {
   private String device;
+  private PartialPath series;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList;
   private TsFileIOWriter fileWriter;
   private TsFileResource targetResource;
@@ -80,6 +83,7 @@ public class SingleSeriesCompactionExecutor {
       TsFileIOWriter fileWriter,
       TsFileResource targetResource) {
     this.device = series.getDevice();
+    this.series = series;
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.fileWriter = fileWriter;
     this.schema = measurementSchema;
@@ -89,6 +93,22 @@ public class SingleSeriesCompactionExecutor {
     this.targetResource = targetResource;
   }
 
+  public SingleSeriesCompactionExecutor(
+      PartialPath series,
+      LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
+      TsFileIOWriter fileWriter,
+      TsFileResource targetResource) {
+    this.device = series.getDevice();
+    this.series = series;
+    this.readerAndChunkMetadataList = readerAndChunkMetadataList;
+    this.fileWriter = fileWriter;
+    this.schema = null;
+    this.chunkWriter = null;
+    this.cachedChunk = null;
+    this.cachedChunkMetadata = null;
+    this.targetResource = targetResource;
+  }
+
   /**
    * This function execute the compaction of a single time series. Notice, the 
result of single
    * series compaction may contain more than one chunk.
@@ -101,6 +121,9 @@ public class SingleSeriesCompactionExecutor {
       List<ChunkMetadata> chunkMetadataList = readerListPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         Chunk currentChunk = reader.readMemChunk(chunkMetadata);
+        if (this.chunkWriter == null) {
+          constructChunkWriterFromReadChunk(currentChunk);
+        }
         CompactionMetricsManager.recordReadInfo(
             currentChunk.getHeader().getSerializedSize() + 
currentChunk.getHeader().getDataSize());
 
@@ -135,6 +158,17 @@ public class SingleSeriesCompactionExecutor {
     targetResource.updateEndTime(device, maxEndTimestamp);
   }
 
+  private void constructChunkWriterFromReadChunk(Chunk chunk) {
+    ChunkHeader chunkHeader = chunk.getHeader();
+    this.schema =
+        new MeasurementSchema(
+            series.getMeasurement(),
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType());
+    this.chunkWriter = new ChunkWriterImpl(this.schema);
+  }
+
   private long getChunkSize(Chunk chunk) {
     return chunk.getHeader().getSerializedSize() + 
chunk.getHeader().getDataSize();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index e8ca02bbc7..2a3d50d9ff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -21,16 +21,12 @@ package 
org.apache.iotdb.db.engine.compaction.performer.impl;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
 import org.apache.iotdb.db.engine.compaction.performer.ISeqCompactionPerformer;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.idtable.IDTableManager;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -141,20 +137,8 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
       // dead-loop.
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
-      try {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-          measurementSchema =
-              IDTableManager.getInstance().getSeriesSchema(device, 
p.getMeasurement());
-        } else {
-          measurementSchema = IoTDB.schemaProcessor.getSeriesSchema(p);
-        }
-      } catch (PathNotExistException e) {
-        LOGGER.info("A deleted path is skipped: {}", e.getMessage());
-        continue;
-      }
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
-          new SingleSeriesCompactionExecutor(
-              p, measurementSchema, readerAndChunkMetadataList, writer, 
targetResource);
+          new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, 
writer, targetResource);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 3dc0c5ab11..499868e80b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerformerSubTask;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
@@ -32,23 +33,27 @@ import 
org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
 import org.apache.iotdb.db.engine.compaction.writer.InnerSpaceCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
@@ -57,7 +62,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,6 +79,7 @@ public class ReadPointCompactionPerformer
   private List<TsFileResource> unseqFiles = Collections.emptyList();
   private static final int subTaskNum =
       IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
+  private Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new 
HashMap<>();
 
   private List<TsFileResource> targetFiles = Collections.emptyList();
 
@@ -127,6 +135,7 @@ public class ReadPointCompactionPerformer
       updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter);
       updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
     } finally {
+      clearReaderCache();
       QueryResourceManager.getInstance().endQuery(queryId);
     }
   }
@@ -146,19 +155,8 @@ public class ReadPointCompactionPerformer
     MultiTsFileDeviceIterator.AlignedMeasurementIterator 
alignedMeasurementIterator =
         deviceIterator.iterateAlignedSeries(device);
     Set<String> allMeasurements = 
alignedMeasurementIterator.getAllMeasurements();
-    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-    for (String measurement : allMeasurements) {
-      try {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-          
measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, 
measurement));
-        } else {
-          measurementSchemas.add(
-              IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, 
measurement)));
-        }
-      } catch (PathNotExistException e) {
-        LOGGER.info("A deleted path is skipped: {}", e.getMessage());
-      }
-    }
+    Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device, 
allMeasurements);
+    List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>(schemaMap.values());
     if (measurementSchemas.isEmpty()) {
       return;
     }
@@ -192,11 +190,12 @@ public class ReadPointCompactionPerformer
       AbstractCompactionWriter compactionWriter,
       QueryContext queryContext,
       QueryDataSource queryDataSource)
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, IllegalPathException {
     MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
         deviceIterator.iterateNotAlignedSeries(device, false);
     Set<String> allMeasurements = measurementIterator.getAllMeasurements();
     int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+    Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device, 
allMeasurements);
 
     // assign all measurements to different sub tasks
     Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
@@ -221,6 +220,7 @@ public class ReadPointCompactionPerformer
                       queryContext,
                       queryDataSource,
                       compactionWriter,
+                      schemaMap,
                       i)));
     }
 
@@ -237,6 +237,76 @@ public class ReadPointCompactionPerformer
     compactionWriter.endChunkGroup();
   }
 
+  private Map<String, MeasurementSchema> getMeasurementSchema(
+      String device, Set<String> measurements) throws IllegalPathException, 
IOException {
+    HashMap<String, MeasurementSchema> schemaMap = new HashMap<>();
+    List<TsFileResource> allResources = new LinkedList<>(seqFiles);
+    allResources.addAll(unseqFiles);
+    // sort the tsfile by version, so that we can iterate the tsfile from the 
newest to oldest
+    allResources.sort(
+        (o1, o2) -> {
+          try {
+            TsFileNameGenerator.TsFileName n1 =
+                TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+            TsFileNameGenerator.TsFileName n2 =
+                TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+            return (int) (n2.getVersion() - n1.getVersion());
+          } catch (IOException e) {
+            return 0;
+          }
+        });
+    for (String measurement : measurements) {
+      for (TsFileResource tsFileResource : allResources) {
+        if (!tsFileResource.mayContainsDevice(device)) {
+          continue;
+        }
+        MeasurementSchema schema =
+            getMeasurementSchemaFromReader(
+                tsFileResource,
+                readerCacheMap.computeIfAbsent(
+                    tsFileResource,
+                    x -> {
+                      try {
+                        
FileReaderManager.getInstance().increaseFileReaderReference(x, true);
+                        return 
FileReaderManager.getInstance().get(x.getTsFilePath(), true);
+                      } catch (IOException e) {
+                        throw new RuntimeException(
+                            String.format(
+                                "Failed to construct sequence reader for %s", 
tsFileResource));
+                      }
+                    }),
+                device,
+                measurement);
+        if (schema != null) {
+          schemaMap.put(measurement, schema);
+          break;
+        }
+      }
+    }
+    return schemaMap;
+  }
+
+  private MeasurementSchema getMeasurementSchemaFromReader(
+      TsFileResource resource, TsFileSequenceReader reader, String device, 
String measurement)
+      throws IllegalPathException, IOException {
+    List<ChunkMetadata> chunkMetadata =
+        reader.getChunkMetadataList(new PartialPath(device, measurement));
+    if (chunkMetadata.size() > 0) {
+      chunkMetadata.get(0).setFilePath(resource.getTsFilePath());
+      Chunk chunk = ChunkCache.getInstance().get(chunkMetadata.get(0));
+      ChunkHeader header = chunk.getHeader();
+      return new MeasurementSchema(
+          measurement, header.getDataType(), header.getEncodingType(), 
header.getCompressionType());
+    }
+    return null;
+  }
+
+  private void clearReaderCache() throws IOException {
+    for (TsFileResource resource : readerCacheMap.keySet()) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(resource, 
true);
+    }
+  }
+
   private static void updateDeviceStartTimeAndEndTime(
       List<TsFileResource> targetResources, AbstractCompactionWriter 
compactionWriter) {
     List<TsFileIOWriter> targetFileWriters = 
compactionWriter.getFileIOWriter();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 9c1ca40f7d..df68f4eb07 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -40,8 +40,11 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.fail;
+
 public class CompactionTaskManagerTest extends InnerCompactionTest {
   static final Logger logger = 
LoggerFactory.getLogger(CompactionTaskManagerTest.class);
   File tempSGDir;
@@ -120,7 +123,7 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
         logger.warn("{}", manager.getRunningCompactionTaskList());
       }
       if (waitingTime > MAX_WAITING_TIME) {
-        Assert.fail();
+        fail();
       }
     }
   }
@@ -179,7 +182,7 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
         logger.warn("{}", 
CompactionTaskManager.getInstance().getRunningCompactionTaskList());
       }
       if (waitingTime > MAX_WAITING_TIME) {
-        Assert.fail();
+        fail();
       }
     }
     for (TsFileResource resource : seqResources) {
@@ -232,7 +235,7 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
           logger.warn("{}", manager.getRunningCompactionTaskList());
         }
         if (waitingTime > MAX_WAITING_TIME) {
-          Assert.fail();
+          fail();
         }
       }
     }
@@ -278,7 +281,7 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
         logger.warn("{}", manager.getRunningCompactionTaskList());
       }
       if (waitingTime > MAX_WAITING_TIME) {
-        Assert.fail();
+        fail();
       }
     }
   }
@@ -313,6 +316,7 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
   public void testRewriteCrossCompactionFileStatus() throws Exception {
     TsFileManager tsFileManager =
         new TsFileManager("root.compactionTest", "0", 
tempSGDir.getAbsolutePath());
+    seqResources = seqResources.subList(1, 5);
     tsFileManager.addAll(seqResources, true);
     tsFileManager.addAll(unseqResources, false);
     CrossSpaceCompactionTask task =
@@ -340,7 +344,14 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
     }
 
     CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
-    CompactionTaskManager.getInstance().waitAllCompactionFinish();
+    long waitingTime = 0;
+    while (!task.isTaskFinished()) {
+      TimeUnit.MILLISECONDS.sleep(200);
+      waitingTime += 200;
+      if (waitingTime > 10_000) {
+        fail();
+      }
+    }
     for (TsFileResource resource : seqResources) {
       Assert.assertFalse(resource.isCompactionCandidate());
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index 403ddcc51e..e0419f721b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -110,11 +110,13 @@ public class InnerUnseqCompactionTest {
           Collections.emptyMap());
     }
     Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
+    EnvironmentUtils.envSetUp();
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
     new CompactionConfigRestorer().restoreCompactionConfig();
+    EnvironmentUtils.cleanEnv();
     CompactionClearUtils.clearAllCompactionFiles();
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
index a88a6e57d5..fe9f9ef987 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
@@ -483,7 +483,6 @@ public class ReadChunkCompactionPerformerNoAlignedTest {
       for (String path : fullPathSetWithDeleted) {
         chunkPagePointsNumMerged.put(path, chunkPointsArray);
       }
-      chunkPagePointsNumMerged.put(deletedPath, null);
       CompactionCheckerUtils.checkChunkAndPage(chunkPagePointsNumMerged, 
targetResource);
       Map<PartialPath, List<TimeValuePair>> compactedData =
           CompactionCheckerUtils.getDataByQuery(

Reply via email to