This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch fix_closed_channel_issue
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/fix_closed_channel_issue by
this push:
new bbf4a19 fix the bug for FileNodeProcess merge process
bbf4a19 is described below
commit bbf4a19bf89cbd9be936cf56c13c5304eebec641
Author: cgf16 <[email protected]>
AuthorDate: Mon Mar 25 14:06:45 2019 +0800
fix the bug for FileNodeProcess merge process
---
.../db/engine/filenode/FileNodeProcessor.java | 29 ++++++++++++++--------
.../db/engine/overflow/io/OverflowProcessor.java | 2 ++
.../db/engine/overflow/io/OverflowResource.java | 1 +
.../db/query/control/OpenedFilePathsManager.java | 4 +--
.../db/query/factory/SeriesReaderFactory.java | 10 --------
5 files changed, 23 insertions(+), 23 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 889f816..32e83e0 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -1352,6 +1352,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// overflow switch from merge to work
overflowProcessor.switchMergeToWork();
+
// write status to file
isMerging = FileNodeProcessorStatus.NONE;
synchronized (fileNodeProcessorStore) {
@@ -1415,8 +1416,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
for (File file : files) {
if (!bufferFiles.contains(file.getPath())) {
- System.out.println("TO delete file ~~~~ " + file.getPath());
-
FileReaderManager.getInstance().closeFileAndRemoveReader(file.getAbsolutePath());
+
FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPath());
if (!file.delete()) {
LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
}
@@ -1447,6 +1447,8 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
mergeDeleteLock.lock();
QueryContext context = new QueryContext();
try {
+
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
+ true);
for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
// query one deviceId
List<Path> pathList = new ArrayList<>();
@@ -1477,12 +1479,16 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new
SingleSeriesExpression(path, timeFilter);
+ for (OverflowInsertFile overflowInsertFile :
overflowSeriesDataSource.getOverflowInsertFileList()) {
+
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
+ false);
+ }
+
IReader seriesReader = SeriesReaderFactory.getInstance()
.createSeriesReaderForMerge(backupIntervalFile,
overflowSeriesDataSource, seriesFilter, context);
-
numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter,
dataType,
- startTimeMap, endTimeMap, backupIntervalFile,
overflowSeriesDataSource);
+ startTimeMap, endTimeMap, overflowSeriesDataSource);
}
if (mergeIsChunkGroupHasData) {
// end the new rowGroupMetadata
@@ -1492,6 +1498,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
} finally {
+
FileReaderManager.getInstance().decreaseFileReaderReference(backupIntervalFile.getFilePath(),
+ true);
+
if (mergeDeleteLock.isLocked()) {
mergeDeleteLock.unlock();
}
@@ -1513,7 +1522,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
private int queryAndWriteSeries(IReader seriesReader, Path path,
SingleSeriesExpression seriesFilter, TSDataType dataType,
Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
- TsFileResource tsFileResource, OverflowSeriesDataSource
overflowSeriesDataSource)
+ OverflowSeriesDataSource overflowSeriesDataSource)
throws IOException {
int numOfChunk = 0;
try {
@@ -1559,12 +1568,10 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
seriesWriterImpl.writeToFileWriter(mergeFileWriter);
}
} finally {
-//
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource.getFilePath(),
-// true);
-// for (OverflowInsertFile overflowInsertFile :
overflowSeriesDataSource.getOverflowInsertFileList()) {
-//
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
-// false);
-// }
+ for (OverflowInsertFile overflowInsertFile :
overflowSeriesDataSource.getOverflowInsertFileList()) {
+
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+ false);
+ }
}
return numOfChunk;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index b4669ec..0debfe4 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -409,6 +410,7 @@ public class OverflowProcessor extends Processor {
public void switchMergeToWork() throws IOException {
if (mergeResource != null) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(mergeResource.getInsertFilePath());
mergeResource.close();
mergeResource.deleteResource();
mergeResource = null;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 67916b2..fbaf317 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 585d85d..bc6ee38 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -94,11 +94,11 @@ public class OpenedFilePathsManager {
jobIdContainer.remove();
for (String filePath : closedFilePathsMap.get(jobId)) {
- FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
false);
+ FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
true);
}
closedFilePathsMap.remove(jobId);
for (String filePath : unclosedFilePathsMap.get(jobId)) {
- FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
true);
+ FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
false);
}
unclosedFilePathsMap.remove(jobId);
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index e22d7ce..42e81f2 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -79,11 +79,6 @@ public class SeriesReaderFactory {
for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
.getOverflowInsertFileList()) {
- // add current overflowInsertFile reference to FileReaderManager
- // to avoid that this reader is cleared in fix time
-//
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
-// false);
-
// store only one opened file stream into manager, to avoid too many
opened files
TsFileSequenceReader unClosedTsFileReader =
FileReaderManager.getInstance()
.get(overflowInsertFile.getFilePath(), false);
@@ -166,11 +161,6 @@ public class SeriesReaderFactory {
QueryContext context)
throws IOException {
- // add current tsfile reference to FileReaderManager
- // to avoid that this reader is cleared in fix time
-//
FileReaderManager.getInstance().increaseFileReaderReference(fileNode.getFilePath(),
-// true);
-
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(fileNode.getFilePath(), true);
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);