This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch fix_closed_channel_issue
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/fix_closed_channel_issue by 
this push:
     new bbf4a19  fix the bug for FileNodeProcess merge process
bbf4a19 is described below

commit bbf4a19bf89cbd9be936cf56c13c5304eebec641
Author: cgf16 <[email protected]>
AuthorDate: Mon Mar 25 14:06:45 2019 +0800

    fix the bug for FileNodeProcess merge process
---
 .../db/engine/filenode/FileNodeProcessor.java      | 29 ++++++++++++++--------
 .../db/engine/overflow/io/OverflowProcessor.java   |  2 ++
 .../db/engine/overflow/io/OverflowResource.java    |  1 +
 .../db/query/control/OpenedFilePathsManager.java   |  4 +--
 .../db/query/factory/SeriesReaderFactory.java      | 10 --------
 5 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 889f816..32e83e0 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -1352,6 +1352,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
 
         // overflow switch from merge to work
         overflowProcessor.switchMergeToWork();
+
         // write status to file
         isMerging = FileNodeProcessorStatus.NONE;
         synchronized (fileNodeProcessorStore) {
@@ -1415,8 +1416,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
       }
       for (File file : files) {
         if (!bufferFiles.contains(file.getPath())) {
-          System.out.println("TO delete file ~~~~  " + file.getPath());
-          
FileReaderManager.getInstance().closeFileAndRemoveReader(file.getAbsolutePath());
+          
FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPath());
           if (!file.delete()) {
             LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
           }
@@ -1447,6 +1447,8 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
     mergeDeleteLock.lock();
     QueryContext context = new QueryContext();
     try {
+      
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
+          true);
       for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
         // query one deviceId
         List<Path> pathList = new ArrayList<>();
@@ -1477,12 +1479,16 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
                   TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
           SingleSeriesExpression seriesFilter = new 
SingleSeriesExpression(path, timeFilter);
 
+          for (OverflowInsertFile overflowInsertFile : 
overflowSeriesDataSource.getOverflowInsertFileList()) {
+            
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
+                false);
+          }
+
           IReader seriesReader = SeriesReaderFactory.getInstance()
               .createSeriesReaderForMerge(backupIntervalFile,
                   overflowSeriesDataSource, seriesFilter, context);
-
           numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, 
dataType,
-              startTimeMap, endTimeMap, backupIntervalFile, 
overflowSeriesDataSource);
+              startTimeMap, endTimeMap, overflowSeriesDataSource);
         }
         if (mergeIsChunkGroupHasData) {
           // end the new rowGroupMetadata
@@ -1492,6 +1498,9 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         }
       }
     } finally {
+      
FileReaderManager.getInstance().decreaseFileReaderReference(backupIntervalFile.getFilePath(),
+          true);
+
       if (mergeDeleteLock.isLocked()) {
         mergeDeleteLock.unlock();
       }
@@ -1513,7 +1522,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
   private int queryAndWriteSeries(IReader seriesReader, Path path,
       SingleSeriesExpression seriesFilter, TSDataType dataType,
       Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
-      TsFileResource tsFileResource, OverflowSeriesDataSource 
overflowSeriesDataSource)
+      OverflowSeriesDataSource overflowSeriesDataSource)
       throws IOException {
     int numOfChunk = 0;
     try {
@@ -1559,12 +1568,10 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         seriesWriterImpl.writeToFileWriter(mergeFileWriter);
       }
     } finally {
-//      
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource.getFilePath(),
-//              true);
-//      for (OverflowInsertFile overflowInsertFile : 
overflowSeriesDataSource.getOverflowInsertFileList()) {
-//        
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
-//                false);
-//      }
+      for (OverflowInsertFile overflowInsertFile : 
overflowSeriesDataSource.getOverflowInsertFileList()) {
+        
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+                false);
+      }
     }
     return numOfChunk;
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index b4669ec..0debfe4 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -409,6 +410,7 @@ public class OverflowProcessor extends Processor {
 
   public void switchMergeToWork() throws IOException {
     if (mergeResource != null) {
+      
FileReaderManager.getInstance().closeFileAndRemoveReader(mergeResource.getInsertFilePath());
       mergeResource.close();
       mergeResource.deleteResource();
       mergeResource = null;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 67916b2..fbaf317 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 585d85d..bc6ee38 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -94,11 +94,11 @@ public class OpenedFilePathsManager {
       jobIdContainer.remove();
 
       for (String filePath : closedFilePathsMap.get(jobId)) {
-        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
false);
+        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
true);
       }
       closedFilePathsMap.remove(jobId);
       for (String filePath : unclosedFilePathsMap.get(jobId)) {
-        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
true);
+        FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
false);
       }
       unclosedFilePathsMap.remove(jobId);
     }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index e22d7ce..42e81f2 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -79,11 +79,6 @@ public class SeriesReaderFactory {
     for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
         .getOverflowInsertFileList()) {
 
-      // add current overflowInsertFile reference to FileReaderManager
-      // to avoid that this reader is cleared in fix time
-//      
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
-//              false);
-
       // store only one opened file stream into manager, to avoid too many 
opened files
       TsFileSequenceReader unClosedTsFileReader = 
FileReaderManager.getInstance()
           .get(overflowInsertFile.getFilePath(), false);
@@ -166,11 +161,6 @@ public class SeriesReaderFactory {
       QueryContext context)
       throws IOException {
 
-    // add current tsfile reference to FileReaderManager
-    // to avoid that this reader is cleared in fix time
-//    
FileReaderManager.getInstance().increaseFileReaderReference(fileNode.getFilePath(),
-//            true);
-
     TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
         .get(fileNode.getFilePath(), true);
     ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);

Reply via email to