This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_many_bugs
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 230dd5419242ded39431ba88efa554fa76d4eaea
Author: qiaojialin <[email protected]>
AuthorDate: Tue Mar 31 19:10:38 2020 +0800

    fix recover bugs
---
 .../main/java/org/apache/iotdb/SessionExample.java |  1 -
 .../engine/storagegroup/StorageGroupProcessor.java |  4 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 53 ++++++++-------
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |  4 ++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 18 ++----
 .../write/writer/RestorableTsFileIOWriter.java     | 30 +++++----
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 75 +++++++++++++---------
 7 files changed, 102 insertions(+), 83 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 33e3090..70b5411 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.record.RowBatch;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.Schema;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e006118..97c2be9 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -408,7 +408,7 @@ public class StorageGroupProcessor {
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
-        workUnsequenceTsFileProcessors
+        workSequenceTsFileProcessors
             .put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -437,6 +437,8 @@ public class StorageGroupProcessor {
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::unsequenceFlushCallback, false, writer);
+        workUnsequenceTsFileProcessors
+            .put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileProcessor.setTimeRangeId(timePartitionId);
         writer.makeMetadataVisible();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index f2ef3ec..4c782af 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -571,9 +571,10 @@ public class PlanExecutor implements IPlanExecutor {
                 file.getAbsolutePath()));
       }
       Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
-      Map<Path, List<ChunkMetadata>> chunkMetaDataListMap = new HashMap<>();
+
+      List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList = new 
ArrayList<>();
       try (TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath(), false)) {
-        reader.selfCheck(schemaMap, chunkMetaDataListMap, false);
+        reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
       }
 
       FileLoaderUtils.checkTsFileResource(tsFileResource);
@@ -586,7 +587,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       //create schemas if they doesn't exist
       if (plan.isAutoCreateSchema()) {
-        createSchemaAutomatically(chunkMetaDataListMap, schemaMap, 
plan.getSgLevel());
+        createSchemaAutomatically(chunkGroupMetadataList, schemaMap, 
plan.getSgLevel());
       }
 
       StorageEngine.getInstance().loadNewTsFile(tsFileResource);
@@ -596,36 +597,34 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  private void createSchemaAutomatically(Map<Path, List<ChunkMetadata>> 
chunkMetaDataListMap,
+  private void createSchemaAutomatically(
+      List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList,
       Map<Path, MeasurementSchema> knownSchemas, int sgLevel)
       throws QueryProcessException, MetadataException {
-    if (chunkMetaDataListMap.isEmpty()) {
+    if (chunkGroupMetadataList.isEmpty()) {
       return;
     }
-    for (Entry<Path, List<ChunkMetadata>> entry : 
chunkMetaDataListMap.entrySet()) {
-      String device = entry.getKey().getDevice();
+
+    Set<Path> registeredSeries = new HashSet<>();
+    for (Pair<String, List<ChunkMetadata>> chunkGroupMetadata : 
chunkGroupMetadataList) {
+      String device = chunkGroupMetadata.left;
       MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, 
true, sgLevel);
-      for (ChunkMetadata chunkMetaData : entry.getValue()) {
-        String measurement = chunkMetaData.getMeasurementUid();
-        String fullPath = device + IoTDBConstant.PATH_SEPARATOR + measurement;
-        MeasurementSchema schema = knownSchemas.get(entry.getKey());
-        if (schema == null) {
-          throw new MetadataException(String
-              .format("Can not get the schema of measurement [%s]", 
measurement));
-        }
-        if (!node.hasChild(measurement)) {
-          try {
-            mManager.createTimeseries(fullPath, schema.getType(), 
schema.getEncodingType(),
-                schema.getCompressor(), Collections.emptyMap());
-          } catch (MetadataException e) {
-            if (!e.getMessage().contains("already exist")) {
-              throw e;
-            }
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.right) {
+        Path series = new Path(chunkGroupMetadata.left, 
chunkMetadata.getMeasurementUid());
+        if (!registeredSeries.contains(series)) {
+          registeredSeries.add(series);
+          MeasurementSchema schema = knownSchemas.get(series);
+          if (schema == null) {
+            throw new MetadataException(String.format("Can not get the schema 
of measurement [%s]",
+                    chunkMetadata.getMeasurementUid()));
+          }
+          if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
+            mManager.createTimeseries(series.getFullPath(), schema.getType(),
+                schema.getEncodingType(), schema.getCompressor(), 
Collections.emptyMap());
+          } else if (node.getChild(chunkMetadata.getMeasurementUid()) 
instanceof InternalMNode) {
+            throw new QueryProcessException(
+                String.format("Current Path is not leaf node. %s", series));
           }
-        }
-        if (node.getChild(measurement) instanceof InternalMNode) {
-          throw new QueryProcessException(
-              String.format("Current Path is not leaf node. %s", fullPath));
         }
       }
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 8480e4a..3c2a38b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -215,6 +215,10 @@ public class SeqTsFileRecoverTest {
         versionController, resource, true, true);
     ActiveTimeSeriesCounter.getInstance().init(storageGroup);
     RestorableTsFileIOWriter writer = performer.recover();
+
+    writer.makeMetadataVisible();
+    assertEquals(11, writer.getMetadatasForQuery().size());
+
     assertTrue(writer.canWrite());
     writer.endFile();
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 93354a6..af37244 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -563,7 +563,7 @@ public class TsFileSequenceReader implements AutoCloseable {
    *
    * @param newSchema   @OUT. the measurement schema in the file will be added 
into this parameter.
    *                    (can be null)
-   * @param chunkMetadataListMap   @OUT. the treeMap (Path -> 
ChunkmetadataList)
+   * @param chunkGroupMetadataList   @OUT. the treeMap (Path -> 
ChunkmetadataList)
    *                    (can be null)
    * @param fastFinish  if true and the file is complete, then newSchema and 
newMetaData parameter
    *                    will be not modified.
@@ -572,7 +572,7 @@ public class TsFileSequenceReader implements AutoCloseable {
    */
 
   public long selfCheck(Map<Path, MeasurementSchema> newSchema,
-      Map<Path, List<ChunkMetadata>> chunkMetadataListMap,
+      List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList,
       boolean fastFinish) throws IOException {
     File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
     long fileSize;
@@ -586,7 +586,8 @@ public class TsFileSequenceReader implements AutoCloseable {
     TSDataType dataType;
     long fileOffsetOfChunk;
 
-    List<ChunkMetadata> chunks = null;
+    // ChunkMetadata of current ChunkGroup
+    List<ChunkMetadata> chunkMetadataList = null;
     String deviceID;
 
     int position = TSFileConfig.MAGIC_STRING.getBytes().length + 
TSFileConfig.VERSION_NUMBER
@@ -622,7 +623,7 @@ public class TsFileSequenceReader implements AutoCloseable {
             // this is the first chunk of a new ChunkGroup.
             if (newChunkGroup) {
               newChunkGroup = false;
-              chunks = new ArrayList<>();
+              chunkMetadataList = new ArrayList<>();
             }
             fileOffsetOfChunk = this.position() - 1;
             // if there is something wrong with a chunk, we will drop the 
whole ChunkGroup
@@ -654,7 +655,7 @@ public class TsFileSequenceReader implements AutoCloseable {
             }
             currentChunk = new ChunkMetadata(measurementID, dataType, 
fileOffsetOfChunk,
                 chunkStatistics);
-            chunks.add(currentChunk);
+            chunkMetadataList.add(currentChunk);
             chunkCnt++;
             break;
           case MetaMarker.CHUNK_GROUP_FOOTER:
@@ -669,12 +670,7 @@ public class TsFileSequenceReader implements AutoCloseable 
{
                 newSchema.putIfAbsent(new Path(deviceID, 
tsSchema.getMeasurementId()), tsSchema);
               }
             }
-            if (chunkMetadataListMap != null) {
-              for (ChunkMetadata chunk : chunks) {
-                Path path = new Path(deviceID, chunk.getMeasurementUid());
-                chunkMetadataListMap.computeIfAbsent(path, k -> new 
ArrayList<>()).add(chunk);
-              }
-            }
+            chunkGroupMetadataList.add(new Pair<>(deviceID, 
chunkMetadataList));
             newChunkGroup = true;
             truncatedPosition = this.position();
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index c1fa885..3abb4e7 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -59,7 +59,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   /**
    * all chunk group metadata which have been serialized on disk.
    */
-  private Map<String, Map<String, List<ChunkMetadata>>> metadatas = new 
HashMap<>();
+  private Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = 
new HashMap<>();
 
   /**
    * @param file a given tsfile path you want to (continue to) write
@@ -90,7 +90,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
         }
 
         // uncompleted file
-        truncatedPosition = reader.selfCheck(knownSchemas, 
chunkMetadataListMap, true);
+        truncatedPosition = reader.selfCheck(knownSchemas, 
chunkGroupMetadataList, true);
         totalChunkNum = reader.getTotalChunkNum();
         if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
           out.close();
@@ -168,8 +168,8 @@ public class RestorableTsFileIOWriter extends 
TsFileIOWriter {
   public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String 
measurementId,
       TSDataType dataType) {
     List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-    if (metadatas.containsKey(deviceId) && 
metadatas.get(deviceId).containsKey(measurementId)) {
-      for (ChunkMetadata chunkMetaData : 
metadatas.get(deviceId).get(measurementId)) {
+    if (metadatasForQuery.containsKey(deviceId) && 
metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+      for (ChunkMetadata chunkMetaData : 
metadatasForQuery.get(deviceId).get(measurementId)) {
         // filter: if adevice'sensor is defined as float type, and data has 
been persistent.
         // Then someone deletes the timeseries and recreate it with Int type. 
We have to ignore
         // all the stale data.
@@ -181,6 +181,10 @@ public class RestorableTsFileIOWriter extends 
TsFileIOWriter {
     return chunkMetadataList;
   }
 
+  public Map<String, Map<String, List<ChunkMetadata>>> getMetadatasForQuery() {
+    return metadatasForQuery;
+  }
+
   /**
    * add all appendChunkMetadatas into memory. After calling this method, 
other classes can
    * read these metadata.
@@ -194,13 +198,13 @@ public class RestorableTsFileIOWriter extends 
TsFileIOWriter {
         String deviceId = pair.left;
         for (ChunkMetadata chunkMetaData : rowMetaDataList) {
           String measurementId = chunkMetaData.getMeasurementUid();
-          if (!metadatas.containsKey(deviceId)) {
-            metadatas.put(deviceId, new HashMap<>());
+          if (!metadatasForQuery.containsKey(deviceId)) {
+            metadatasForQuery.put(deviceId, new HashMap<>());
           }
-          if (!metadatas.get(deviceId).containsKey(measurementId)) {
-            metadatas.get(deviceId).put(measurementId, new ArrayList<>());
+          if (!metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+            metadatasForQuery.get(deviceId).put(measurementId, new 
ArrayList<>());
           }
-          metadatas.get(deviceId).get(measurementId).add(chunkMetaData);
+          
metadatasForQuery.get(deviceId).get(measurementId).add(chunkMetaData);
         }
       }
     }
@@ -218,10 +222,10 @@ public class RestorableTsFileIOWriter extends 
TsFileIOWriter {
    */
   private List<Pair<String, List<ChunkMetadata>>> getAppendedRowMetadata() {
     List<Pair<String, List<ChunkMetadata>>> append = new ArrayList<>();
-    if (lastFlushedChunkGroupIndex < chunkGroupInfoList.size()) {
-      append.addAll(chunkGroupInfoList
-          .subList(lastFlushedChunkGroupIndex, chunkGroupInfoList.size()));
-      lastFlushedChunkGroupIndex = chunkGroupInfoList.size();
+    if (lastFlushedChunkGroupIndex < chunkGroupMetadataList.size()) {
+      append.addAll(chunkGroupMetadataList
+          .subList(lastFlushedChunkGroupIndex, chunkGroupMetadataList.size()));
+      lastFlushedChunkGroupIndex = chunkGroupMetadataList.size();
     }
     return append;
   }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 0339f5b..d19344e 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.write.writer;
 
+import java.util.Iterator;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -65,14 +66,19 @@ public class TsFileIOWriter {
   }
 
   protected TsFileOutput out;
-  protected List<Pair<String, List<ChunkMetadata>>> chunkGroupInfoList = new 
ArrayList<>();
   protected boolean canWrite = true;
   protected int totalChunkNum = 0;
   protected int invalidChunkNum;
   protected File file;
-  protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-  protected Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new 
TreeMap<>();
+
+
+  // current flushed Chunk
   private ChunkMetadata currentChunkMetadata;
+  // current flushed ChunkGroup
+  protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+  // all flushed ChunkGroup,  device -> List<ChunkMetadata>
+  protected List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList = 
new ArrayList<>();
+
   private long markedPosition;
   private String deviceId;
   private long currentChunkGroupStartOffset;
@@ -144,7 +150,7 @@ public class TsFileIOWriter {
     ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(deviceId, 
dataSize,
         chunkMetadataList.size());
     chunkGroupFooter.serializeTo(out.wrapAsStream());
-    chunkGroupInfoList.add(new Pair<>(deviceId, chunkMetadataList));
+    chunkGroupMetadataList.add(new Pair<>(deviceId, chunkMetadataList));
     logger.debug("end chunk group:{}", chunkMetadataList);
     deviceId = null;
     chunkMetadataList = null;
@@ -204,8 +210,6 @@ public class TsFileIOWriter {
    */
   public void endCurrentChunk() {
     chunkMetadataList.add(currentChunkMetadata);
-    Path path = new Path(deviceId, currentChunkMetadata.getMeasurementUid());
-    chunkMetadataListMap.computeIfAbsent(path, k -> new 
ArrayList<>()).add(currentChunkMetadata);
     currentChunkMetadata = null;
     totalChunkNum++;
   }
@@ -219,10 +223,17 @@ public class TsFileIOWriter {
 
     // serialize the SEPARATOR of MetaData
     ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-    
-    logger.debug("get time series list:{}", chunkMetadataListMap.keySet());
-    
-    Map<String, Pair<Long, Integer>> deviceMetaDataMap = 
flushAllChunkMetadataList();
+
+    // group ChunkMetadata by series
+    Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+    for (Pair<String, List<ChunkMetadata>> chunkGroupMetadata: 
chunkGroupMetadataList) {
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.right) {
+        Path series = new Path(chunkGroupMetadata.left, 
chunkMetadata.getMeasurementUid());
+        chunkMetadataListMap.computeIfAbsent(series, k -> new 
ArrayList<>()).add(chunkMetadata);
+      }
+    }
+
+    Map<String, Pair<Long, Integer>> deviceMetaDataMap = 
flushAllChunkMetadataList(chunkMetadataListMap);
     
     TsFileMetadata tsFileMetaData = new TsFileMetadata();
     tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
@@ -231,7 +242,9 @@ public class TsFileIOWriter {
     tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
 
     long footerIndex = out.getPosition();
-    logger.debug("start to flush the footer,file pos:{}", footerIndex);
+    if (logger.isDebugEnabled()) {
+      logger.debug("start to flush the footer,file pos:{}", footerIndex);
+    }
 
     // write TsFileMetaData
     int size = tsFileMetaData.serializeTo(out.wrapAsStream());
@@ -253,19 +266,18 @@ public class TsFileIOWriter {
 
     // close file
     out.close();
-    if (resourceLogger.isInfoEnabled() && file != null) {
-      resourceLogger.info("{} writer is closed.", file.getName());
+    if (resourceLogger.isDebugEnabled() && file != null) {
+      resourceLogger.debug("{} writer is closed.", file.getName());
     }
     canWrite = false;
-    chunkMetadataListMap = new TreeMap<>();
-    logger.info("output stream is closed");
   }
 
   /**
    * Flush ChunkMetadataList and TimeseriesMetaData
    * @return DeviceMetaDataMap in TsFileMetaData
    */
-  private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList() throws 
IOException {
+  private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList(
+      Map<Path, List<ChunkMetadata>> chunkMetadataListMap) throws IOException {
 
     // convert ChunkMetadataList to this field
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new 
LinkedHashMap<>();
@@ -304,7 +316,7 @@ public class TsFileIOWriter {
         size += timeseriesMetaData.serializeTo(out.wrapAsStream());
       }
       deviceMetadataMap
-          .put(device, new Pair<Long, 
Integer>(offsetOfFirstTimeseriesMetaDataInDevice, size));
+          .put(device, new Pair<>(offsetOfFirstTimeseriesMetaDataInDevice, 
size));
     }
     // return
     return deviceMetadataMap;
@@ -323,11 +335,10 @@ public class TsFileIOWriter {
   // device -> ChunkMetadataList
   public Map<String, List<ChunkMetadata>> getDeviceChunkMetadataMap() {
     Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
-    for (Map.Entry<Path, List<ChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
-      Path path = entry.getKey();
-      String device = path.getDevice();
-      deviceChunkMetadataMap.computeIfAbsent(device, k -> new ArrayList<>())
-          .addAll(entry.getValue());
+
+    for (Pair<String, List<ChunkMetadata>> chunkGroupMetadata : 
chunkGroupMetadataList) {
+      deviceChunkMetadataMap.computeIfAbsent(chunkGroupMetadata.left, k -> new 
ArrayList<>())
+          .addAll(chunkGroupMetadata.right);
     }
     return deviceChunkMetadataMap;
   }
@@ -376,22 +387,26 @@ public class TsFileIOWriter {
   /**
    * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
    */
-
   public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
     Map<Path, Integer> startTimeIdxes = new HashMap<>();
     chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
 
-    for (Map.Entry<Path, List<ChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
-      List<ChunkMetadata> chunkMetadatas = entry.getValue();
-      Path path = entry.getKey();
-      int chunkNum = chunkMetadatas.size();
-      for (ChunkMetadata chunkMetaData : chunkMetadatas) {
+    Iterator<Pair<String, List<ChunkMetadata>>> chunkGroupMetaDataIterator = 
chunkGroupMetadataList.iterator();
+    while (chunkGroupMetaDataIterator.hasNext()) {
+      Pair<String, List<ChunkMetadata>> chunkGroupMetaData = 
chunkGroupMetaDataIterator.next();
+      String deviceId = chunkGroupMetaData.left;
+      int chunkNum = chunkGroupMetaData.right.size();
+      Iterator<ChunkMetadata> chunkMetaDataIterator = 
chunkGroupMetaData.right.iterator();
+      while (chunkMetaDataIterator.hasNext()) {
+        ChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
+        Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
         int startTimeIdx = startTimeIdxes.get(path);
+
         List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
         boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
             && pathChunkStartTimes.get(startTimeIdx) == 
chunkMetaData.getStartTime();
         if (!chunkValid) {
-          chunkMetadatas.remove(chunkMetaData);
+          chunkMetaDataIterator.remove();
           chunkNum--;
           invalidChunkNum++;
         } else {
@@ -399,7 +414,7 @@ public class TsFileIOWriter {
         }
       }
       if (chunkNum == 0) {
-        chunkMetadataListMap.remove(path);
+        chunkGroupMetaDataIterator.remove();
       }
     }
   }

Reply via email to