This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch delete_dev4
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/delete_dev4 by this push:
new ee6fe3f fix indent
ee6fe3f is described below
commit ee6fe3f141a7fe2ab3a98e11a237c56af1d65454
Author: 江天 <[email protected]>
AuthorDate: Tue Feb 19 17:45:06 2019 +0800
fix indent
---
.../db/engine/filenode/FileNodeProcessor.java | 417 ++++++++++-----------
1 file changed, 208 insertions(+), 209 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index d5fcf58..98316e6 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -118,7 +118,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
private final HashMap<String, AtomicLong> statParamsHashMap = new
HashMap<String, AtomicLong>() {
{
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
- MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
put(statConstant.name(), new AtomicLong(0));
}
}
@@ -220,16 +220,16 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* constructor of FileNodeProcessor.
*/
public FileNodeProcessor(String fileNodeDirPath, String processorName)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
super(processorName);
statStorageDeltaName =
- MonitorConstants.statStorageGroupPrefix +
MonitorConstants.MONITOR_PATH_SEPERATOR
- + MonitorConstants.fileNodePath +
MonitorConstants.MONITOR_PATH_SEPERATOR
- + processorName.replaceAll("\\.", "_");
+ MonitorConstants.statStorageGroupPrefix +
MonitorConstants.MONITOR_PATH_SEPERATOR
+ + MonitorConstants.fileNodePath +
MonitorConstants.MONITOR_PATH_SEPERATOR
+ + processorName.replaceAll("\\.", "_");
this.parameters = new HashMap<>();
if (fileNodeDirPath.length() > 0
- && fileNodeDirPath.charAt(fileNodeDirPath.length() - 1) !=
File.separatorChar) {
+ && fileNodeDirPath.charAt(fileNodeDirPath.length() - 1) !=
File.separatorChar) {
fileNodeDirPath = fileNodeDirPath + File.separatorChar;
}
this.baseDirPath = fileNodeDirPath + processorName;
@@ -237,16 +237,16 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
if (!dataDir.exists()) {
dataDir.mkdirs();
LOGGER.info(
- "The data directory of the filenode processor {} doesn't exist.
Create new directory {}",
- getProcessorName(), baseDirPath);
+ "The data directory of the filenode processor {} doesn't exist.
Create new directory {}",
+ getProcessorName(), baseDirPath);
}
fileNodeRestoreFilePath = new File(dataDir, processorName +
RESTORE_FILE_SUFFIX).getPath();
try {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
LOGGER.error(
- "The fileNode processor {} encountered an error when recoverying
restore information.",
- processorName, e);
+ "The fileNode processor {} encountered an error when recoverying
restore information.",
+ processorName, e);
throw new FileNodeProcessorException(e);
}
// TODO deep clone the lastupdate time
@@ -269,7 +269,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
}
// status is not NONE, or the last intervalFile is not closed
if (isMerging != FileNodeProcessorStatus.NONE
- || (!newFileNodes.isEmpty() &&
!newFileNodes.get(newFileNodes.size() - 1).isClosed())) {
+ || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed())) {
shouldRecovery = true;
} else {
// add file into the index of file
@@ -297,9 +297,9 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
HashMap<String, String> hashMap = new HashMap<String, String>() {
{
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
- MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR +
statConstant.name(),
- MonitorConstants.DataType);
+ MonitorConstants.DataType);
}
}
};
@@ -310,9 +310,9 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
public List<String> getAllPathForStatistic() {
List<String> list = new ArrayList<>();
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
- MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
list.add(
- statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR +
statConstant.name());
+ statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR +
statConstant.name());
}
return list;
}
@@ -351,10 +351,10 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* add interval FileNode.
*/
public void addIntervalFileNode(long startTime, String baseDir, String
fileName)
- throws Exception {
+ throws Exception {
IntervalFileNode intervalFileNode = new
IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
- fileName);
+ fileName);
this.currentIntervalFileNode = intervalFileNode;
newFileNodes.add(intervalFileNode);
fileNodeProcessorStore.setNewFileNodes(newFileNodes);
@@ -449,38 +449,38 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION,
bufferwriteFlushAction);
parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION,
bufferwriteCloseAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
String baseDir = directories
- .getTsFileFolder(newFileNodes.get(newFileNodes.size() -
1).getBaseDirIndex());
+ .getTsFileFolder(newFileNodes.get(newFileNodes.size() -
1).getBaseDirIndex());
LOGGER.info(
- "The filenode processor {} will recovery the bufferwrite
processor, "
- + "the bufferwrite file is {}",
- getProcessorName(), fileNames[fileNames.length - 1]);
+ "The filenode processor {} will recovery the bufferwrite processor, "
+ + "the bufferwrite file is {}",
+ getProcessorName(), fileNames[fileNames.length - 1]);
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir,
getProcessorName(),
- fileNames[fileNames.length - 1], parameters,
versionController, fileSchema);
+ fileNames[fileNames.length - 1], parameters, versionController,
fileSchema);
} catch (BufferWriteProcessorException e) {
// unlock
writeUnlock();
LOGGER.error(
- "The filenode processor {} failed to recovery the bufferwrite
processor, "
- + "the last bufferwrite file is {}.",
- getProcessorName(), fileNames[fileNames.length - 1]);
+ "The filenode processor {} failed to recovery the bufferwrite
processor, "
+ + "the last bufferwrite file is {}.",
+ getProcessorName(), fileNames[fileNames.length - 1]);
throw new FileNodeProcessorException(e);
}
}
// restore the overflow processor
LOGGER.info("The filenode processor {} will recovery the overflow
processor.",
- getProcessorName());
+ getProcessorName());
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION,
overflowFlushAction);
parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
try {
overflowProcessor = new OverflowProcessor(getProcessorName(),
parameters, fileSchema,
- versionController);
+ versionController);
} catch (IOException e) {
writeUnlock();
LOGGER.error("The filenode processor {} failed to recovery the overflow
processor.",
- getProcessorName());
+ getProcessorName());
throw new FileNodeProcessorException(e);
}
@@ -490,14 +490,14 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// re-merge all file
// if bufferwrite processor is not null, and close
LOGGER.info("The filenode processor {} is recovering, the filenode
status is {}.",
- getProcessorName(),
- isMerging);
+ getProcessorName(),
+ isMerging);
merge();
} else if (isMerging == FileNodeProcessorStatus.WAITING) {
// unlock
LOGGER.info("The filenode processor {} is recovering, the filenode
status is {}.",
- getProcessorName(),
- isMerging);
+ getProcessorName(),
+ isMerging);
writeUnlock();
switchWaitingToWorkingv2(newFileNodes);
} else {
@@ -511,23 +511,23 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* get buffer write processor by processor name and insert time.
*/
public BufferWriteProcessor getBufferWriteProcessor(String processorName,
long insertTime)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
if (bufferWriteProcessor == null) {
Map<String, Action> parameters = new HashMap<>();
parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION,
bufferwriteFlushAction);
parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION,
bufferwriteCloseAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
String baseDir = directories.getNextFolderForTsfile();
LOGGER.info("Allocate folder {} for the new bufferwrite processor.",
baseDir);
// construct processor or restore
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
- insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR +
System.currentTimeMillis(),
- parameters, versionController, fileSchema);
+ insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR +
System.currentTimeMillis(),
+ parameters, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
LOGGER.error("The filenode processor {} failed to get the bufferwrite
processor.",
- processorName, e);
+ processorName, e);
throw new FileNodeProcessorException(e);
}
}
@@ -554,9 +554,9 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
// construct processor or restore
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION,
overflowFlushAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
overflowProcessor = new OverflowProcessor(processorName, parameters,
fileSchema,
- versionController);
+ versionController);
}
return overflowProcessor;
}
@@ -626,9 +626,9 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
public void changeTypeToChanged(String deviceId, long timestamp) {
if (!invertedindexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
- "Can not find any tsfile which will be overflowed in the
filenode processor {}, "
- + "the data is [device:{},time:{}]",
- getProcessorName(), deviceId, timestamp);
+ "Can not find any tsfile which will be overflowed in the filenode
processor {}, "
+ + "the data is [device:{},time:{}]",
+ getProcessorName(), deviceId, timestamp);
emptyIntervalFileNode.setStartTime(deviceId, 0L);
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -648,9 +648,9 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
public void changeTypeToChanged(String deviceId, long startTime, long
endTime) {
if (!invertedindexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
- "Can not find any tsfile which will be overflowed in the
filenode processor {}, "
- + "the data is [device:{}, start time:{}, end time:{}]",
- getProcessorName(), deviceId, startTime, endTime);
+ "Can not find any tsfile which will be overflowed in the filenode
processor {}, "
+ + "the data is [device:{}, start time:{}, end time:{}]",
+ getProcessorName(), deviceId, startTime, endTime);
emptyIntervalFileNode.setStartTime(deviceId, 0L);
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -673,9 +673,9 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
if (!invertedindexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
- "Can not find any tsfile which will be overflowed in the
filenode processor {}, "
- + "the data is [device:{}, delete time:{}]",
- getProcessorName(), deviceId, timestamp);
+ "Can not find any tsfile which will be overflowed in the filenode
processor {}, "
+ + "the data is [device:{}, delete time:{}]",
+ getProcessorName(), deviceId, timestamp);
emptyIntervalFileNode.setStartTime(deviceId, 0L);
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -697,7 +697,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
* @return index of interval
*/
private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
- List<IntervalFileNode> fileList) {
+ List<IntervalFileNode> fileList) {
int index = 1;
while (index < fileList.size()) {
if (timestamp < fileList.get(index).getStartTime(deviceId)) {
@@ -731,19 +731,19 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
newMultiPassLock.readLock().unlock();
newMultiPassTokenSet.remove(token);
LOGGER
- .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}",
token, getProcessorName(),
- newMultiPassTokenSet, newMultiPassLock);
+ .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}",
token, getProcessorName(),
+ newMultiPassTokenSet, newMultiPassLock);
return true;
} else if (oldMultiPassTokenSet != null &&
oldMultiPassTokenSet.contains(token)) {
// remove token first, then unlock
oldMultiPassLock.readLock().unlock();
oldMultiPassTokenSet.remove(token);
LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token,
oldMultiPassTokenSet,
- oldMultiPassLock);
+ oldMultiPassLock);
return true;
} else {
LOGGER.error("remove token error:{},new set:{}, old set:{}", token,
newMultiPassTokenSet,
- oldMultiPassTokenSet);
+ oldMultiPassTokenSet);
// should add throw exception
return false;
}
@@ -753,8 +753,8 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
* query data.
*/
public <T extends Comparable<T>> QueryDataSource query(String deviceId,
String measurementId,
- Filter filter,
QueryContext context)
- throws FileNodeProcessorException {
+ Filter filter, QueryContext context)
+ throws FileNodeProcessorException {
// query overflow data
TSDataType dataType = null;
try {
@@ -765,7 +765,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
OverflowSeriesDataSource overflowSeriesDataSource;
try {
overflowSeriesDataSource = overflowProcessor.query(deviceId,
measurementId, filter, dataType,
- context);
+ context);
} catch (IOException e) {
e.printStackTrace();
throw new FileNodeProcessorException(e);
@@ -779,31 +779,31 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata
- = new Pair<ReadOnlyMemChunk, List<ChunkMetaData>>(null, null);
+ = new Pair<ReadOnlyMemChunk, List<ChunkMetaData>>(null, null);
// bufferwrite data
UnsealedTsFile unsealedTsFile = null;
if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed()
- && !newFileNodes.get(newFileNodes.size() -
1).getStartTimeMap().isEmpty()) {
+ && !newFileNodes.get(newFileNodes.size() -
1).getStartTimeMap().isEmpty()) {
unsealedTsFile = new UnsealedTsFile();
unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() -
1).getFilePath());
if (bufferWriteProcessor == null) {
LOGGER.error(
- "The last of tsfile {} in filenode processor {} is not closed,
"
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName());
+ "The last of tsfile {} in filenode processor {} is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName());
throw new FileNodeProcessorException(String.format(
- "The last of tsfile %s in filenode processor %s is not closed,
"
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName()));
+ "The last of tsfile %s in filenode processor %s is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName()));
}
bufferwritedata = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType);
+ .queryBufferWriteData(deviceId, measurementId, dataType);
try {
List<Modification> pathModifications = context.getPathModifications(
- currentIntervalFileNode.getModFile(), deviceId
- + IoTDBConstant.PATH_SEPARATOR + measurementId
+ currentIntervalFileNode.getModFile(), deviceId
+ + IoTDBConstant.PATH_SEPARATOR + measurementId
);
if (pathModifications.size() > 0) {
QueryUtils.modifyChunkMetaData(bufferwritedata.right,
pathModifications);
@@ -815,8 +815,8 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
}
GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new
GlobalSortedSeriesDataSource(
- new Path(deviceId + "." + measurementId), bufferwriteDataInFiles,
unsealedTsFile,
- bufferwritedata.left);
+ new Path(deviceId + "." + measurementId), bufferwriteDataInFiles,
unsealedTsFile,
+ bufferwritedata.left);
return new QueryDataSource(globalSortedSeriesDataSource,
overflowSeriesDataSource);
}
@@ -824,11 +824,11 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
/**
* append one specified tsfile to this filenode processor.
*
- * @param appendFile the appended tsfile information
+ * @param appendFile the appended tsfile information
* @param appendFilePath the seriesPath of appended file
*/
public void appendFile(IntervalFileNode appendFile, String appendFilePath)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
try {
if (!new File(appendFile.getFilePath()).getParentFile().exists()) {
new File(appendFile.getFilePath()).getParentFile().mkdirs();
@@ -838,11 +838,11 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
File targetFile = new File(appendFile.getFilePath());
if (!originFile.exists()) {
throw new FileNodeProcessorException(
- String.format("The appended file %s does not exist.",
appendFilePath));
+ String.format("The appended file %s does not exist.",
appendFilePath));
}
if (targetFile.exists()) {
throw new FileNodeProcessorException(
- String.format("The appended target file %s already exists.",
appendFile.getFilePath()));
+ String.format("The appended target file %s already exists.",
appendFile.getFilePath()));
}
originFile.renameTo(targetFile);
// append the new tsfile
@@ -858,7 +858,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
addAllFileIntoIndex(newFileNodes);
} catch (Exception e) {
LOGGER.error("Failed to append the tsfile {} to filenode processor {}.",
appendFile,
- getProcessorName(), e);
+ getProcessorName(), e);
throw new FileNodeProcessorException(e);
}
}
@@ -869,7 +869,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
* @param appendFile the appended tsfile information
*/
public List<String> getOverlapFiles(IntervalFileNode appendFile, String uuid)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
List<String> overlapFiles = new ArrayList<>();
try {
for (IntervalFileNode intervalFileNode : newFileNodes) {
@@ -878,19 +878,19 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
continue;
}
if (intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
- && intervalFileNode.getStartTime(entry.getKey()) <=
appendFile
- .getEndTime(entry.getKey())) {
+ && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
+ .getEndTime(entry.getKey())) {
String relativeFilePath = "postback" + File.separator + uuid +
File.separator + "backup"
- + File.separator + intervalFileNode.getRelativePath();
+ + File.separator + intervalFileNode.getRelativePath();
File newFile = new File(
-
Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
- relativeFilePath);
+
Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
+ relativeFilePath);
if (!newFile.getParentFile().exists()) {
newFile.getParentFile().mkdirs();
}
java.nio.file.Path link =
FileSystems.getDefault().getPath(newFile.getPath());
java.nio.file.Path target = FileSystems.getDefault()
- .getPath(intervalFileNode.getFilePath());
+ .getPath(intervalFileNode.getFilePath());
Files.createLink(link, target);
overlapFiles.add(newFile.getPath());
break;
@@ -909,7 +909,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
*/
public void addTimeSeries(String measurementToString, String dataType,
String encoding) {
ColumnSchema col = new ColumnSchema(measurementToString,
TSDataType.valueOf(dataType),
- TSEncoding.valueOf(encoding));
+ TSEncoding.valueOf(encoding));
JSONObject measurement = constrcutMeasurement(col);
fileSchema.registerMeasurement(JsonConverter.convertJsonToMeasurementSchema(measurement));
}
@@ -941,32 +941,32 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
long thisMergeTime = System.currentTimeMillis();
long mergeTimeInterval = thisMergeTime - lastMergeTime;
ZonedDateTime lastDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastMergeTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime thisDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisMergeTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The filenode {} last merge time is {}, this merge time is {}, "
- + "merge time interval is {}s",
- getProcessorName(), lastDateTime, thisDateTime,
mergeTimeInterval / 1000);
+ "The filenode {} last merge time is {}, this merge time is {}, "
+ + "merge time interval is {}s",
+ getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval /
1000);
}
lastMergeTime = System.currentTimeMillis();
if (overflowProcessor != null) {
if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
- .getConfig().overflowFileSizeThreshold) {
+ .getConfig().overflowFileSizeThreshold) {
LOGGER.info(
- "Skip this merge taks submission, because the size{} of
overflow processor {} "
- + "does not reaches the threshold {}.",
- MemUtils.bytesCntToStr(overflowProcessor.getFileSize()),
getProcessorName(),
- MemUtils.bytesCntToStr(
-
IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+ "Skip this merge taks submission, because the size{} of overflow
processor {} "
+ + "does not reaches the threshold {}.",
+ MemUtils.bytesCntToStr(overflowProcessor.getFileSize()),
getProcessorName(),
+ MemUtils.bytesCntToStr(
+
IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
return null;
}
} else {
LOGGER.info(
- "Skip this merge taks submission, because the filenode processor
{} "
- + "has no overflow processor.",
- getProcessorName());
+ "Skip this merge taks submission, because the filenode processor {} "
+ + "has no overflow processor.",
+ getProcessorName());
return null;
}
if (isOverflowed && isMerging == FileNodeProcessorStatus.NONE) {
@@ -978,18 +978,18 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
merge();
long mergeEndTime = System.currentTimeMillis();
ZonedDateTime startDateTime = ZonedDateTime
- .ofInstant(Instant.ofEpochMilli(mergeStartTime),
-
IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(mergeEndTime),
+ .ofInstant(Instant.ofEpochMilli(mergeStartTime),
IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(mergeEndTime),
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
long intervalTime = mergeEndTime - mergeStartTime;
LOGGER.info(
- "The filenode processor {} merge start time is {}, "
- + "merge end time is {}, merge consumes {}ms.",
- getProcessorName(), startDateTime, endDateTime,
intervalTime);
+ "The filenode processor {} merge start time is {}, "
+ + "merge end time is {}, merge consumes {}ms.",
+ getProcessorName(), startDateTime, endDateTime, intervalTime);
} catch (FileNodeProcessorException e) {
LOGGER.error("The filenode processor {} encountered an error when
merging.",
- getProcessorName(), e);
+ getProcessorName(), e);
throw new ErrorDebugException(e);
}
};
@@ -998,13 +998,13 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
} else {
if (!isOverflowed) {
LOGGER.info(
- "Skip this merge taks submission, because the filenode
processor {} is not overflowed.",
- getProcessorName());
+ "Skip this merge taks submission, because the filenode processor
{} is not overflowed.",
+ getProcessorName());
} else {
LOGGER.warn(
- "Skip this merge task submission, because last merge task is
not over yet, "
- + "the merge filenode processor is {}",
- getProcessorName());
+ "Skip this merge task submission, because last merge task is not
over yet, "
+ + "the merge filenode processor is {}",
+ getProcessorName());
}
}
return null;
@@ -1016,7 +1016,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
private void prepareForMerge() {
try {
LOGGER.info("The filenode processor {} prepares for merge, closes the
bufferwrite processor",
- getProcessorName());
+ getProcessorName());
closeBufferWrite();
// try to get overflow processor
getOverflowProcessor(getProcessorName());
@@ -1024,16 +1024,16 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
while (!getOverflowProcessor().canBeClosed()) {
try {
LOGGER.info(
- "The filenode processor {} prepares for merge, the overflow
{} can't be closed, "
- + "wait 100ms,",
- getProcessorName(), getProcessorName());
+ "The filenode processor {} prepares for merge, the overflow {}
can't be closed, "
+ + "wait 100ms,",
+ getProcessorName(), getProcessorName());
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.info("The filenode processor {} prepares for merge, closes the
overflow processor",
- getProcessorName());
+ getProcessorName());
getOverflowProcessor().close();
} catch (FileNodeProcessorException | OverflowProcessorException |
IOException e) {
e.printStackTrace();
@@ -1066,7 +1066,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
Map<String, Long> startTimeMap = emptyIntervalFileNode.getStartTimeMap();
if (emptyIntervalFileNode.overflowChangeType !=
OverflowChangeType.NO_CHANGE) {
Iterator<Entry<String, Long>> iterator =
emptyIntervalFileNode.getEndTimeMap().entrySet()
- .iterator();
+ .iterator();
while (iterator.hasNext()) {
Entry<String, Long> entry = iterator.next();
String deviceId = entry.getKey();
@@ -1109,7 +1109,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
writeStoreToDisk(fileNodeProcessorStore);
} catch (FileNodeProcessorException e) {
LOGGER.error("The filenode processor {} writes restore information
error when merging.",
- getProcessorName(), e);
+ getProcessorName(), e);
writeUnlock();
throw new FileNodeProcessorException(e);
}
@@ -1131,12 +1131,12 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
overflowProcessor.switchWorkToMerge();
} catch (IOException e) {
LOGGER.error("The filenode processor {} can't switch overflow processor
from work to merge.",
- getProcessorName(), e);
+ getProcessorName(), e);
writeUnlock();
throw new FileNodeProcessorException(e);
}
LOGGER.info("The filenode processor {} switches from {} to {}.",
getProcessorName(),
- FileNodeProcessorStatus.NONE,
FileNodeProcessorStatus.MERGING_WRITE);
+ FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
writeUnlock();
// query tsfile data and overflow data, and merge them
@@ -1149,40 +1149,40 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
String filePathBeforeMerge = backupIntervalFile.getRelativePath();
try {
LOGGER.info(
- "The filenode processor {} begins merging the {}/{}
tsfile[{}] with overflow file, "
- + "the process is {}%",
- getProcessorName(), numOfMergeFiles, allNeedMergeFiles,
filePathBeforeMerge,
- (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) *
100));
+ "The filenode processor {} begins merging the {}/{} tsfile[{}]
with overflow file, "
+ + "the process is {}%",
+ getProcessorName(), numOfMergeFiles, allNeedMergeFiles,
filePathBeforeMerge,
+ (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) *
100));
long startTime = System.currentTimeMillis();
String newFile = queryAndWriteDataForMerge(backupIntervalFile);
long endTime = System.currentTimeMillis();
long timeConsume = endTime - startTime;
ZonedDateTime startDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(endTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The fileNode processor {} has merged the {}/{}
tsfile[{}->{}] over, "
- + "start time of merge is {}, end time of merge is
{}, time consumption is {}ms,"
- + " the process is {}%",
- getProcessorName(), numOfMergeFiles, allNeedMergeFiles,
filePathBeforeMerge, newFile,
- startDateTime, endDateTime, timeConsume,
- (int) (numOfMergeFiles) / (float) allNeedMergeFiles * 100);
+ "The fileNode processor {} has merged the {}/{} tsfile[{}->{}]
over, "
+ + "start time of merge is {}, end time of merge is {}, time
consumption is {}ms,"
+ + " the process is {}%",
+ getProcessorName(), numOfMergeFiles, allNeedMergeFiles,
filePathBeforeMerge, newFile,
+ startDateTime, endDateTime, timeConsume,
+ (int) (numOfMergeFiles) / (float) allNeedMergeFiles * 100);
} catch (IOException | WriteProcessException | PathErrorException e) {
LOGGER.error("Merge: query and write data error.", e);
throw new FileNodeProcessorException(e);
}
} else if (backupIntervalFile.overflowChangeType ==
OverflowChangeType.MERGING_CHANGE) {
LOGGER.error("The overflowChangeType of backupIntervalFile must not be
{}",
- OverflowChangeType.MERGING_CHANGE);
+ OverflowChangeType.MERGING_CHANGE);
// handle this error, throw one runtime exception
throw new FileNodeProcessorException(
- "The overflowChangeType of backupIntervalFile must not be "
- + OverflowChangeType.MERGING_CHANGE);
+ "The overflowChangeType of backupIntervalFile must not be "
+ + OverflowChangeType.MERGING_CHANGE);
} else {
LOGGER.debug(
- "The filenode processor {} is merging, the interval file {}
doesn't need to be merged.",
- getProcessorName(), backupIntervalFile.getRelativePath());
+ "The filenode processor {} is merging, the interval file {}
doesn't need to be merged.",
+ getProcessorName(), backupIntervalFile.getRelativePath());
}
}
@@ -1204,8 +1204,8 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
result.add(emptyIntervalFileNode.backUp());
if (!newFileNodes.isEmpty()) {
throw new FileNodeProcessorException(
- String.format("The status of empty file is %s, but the new
file list is not empty",
- emptyIntervalFileNode.overflowChangeType));
+ String.format("The status of empty file is %s, but the new file
list is not empty",
+ emptyIntervalFileNode.overflowChangeType));
}
return result;
}
@@ -1234,15 +1234,15 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
IntervalFileNode node = new IntervalFileNode(startTimeMap,
endTimeMap,
- intervalFileNode.overflowChangeType,
intervalFileNode.getBaseDirIndex(),
- intervalFileNode.getRelativePath());
+ intervalFileNode.overflowChangeType,
intervalFileNode.getBaseDirIndex(),
+ intervalFileNode.getRelativePath());
result.add(node);
}
}
} else {
LOGGER.error("No file was changed when merging, the filenode is {}",
getProcessorName());
throw new FileNodeProcessorException(
- "No file was changed when merging, the filenode is " +
getProcessorName());
+ "No file was changed when merging, the filenode is " +
getProcessorName());
}
return result;
}
@@ -1301,9 +1301,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
*/
private void switchMergeToWaitingv2(List<IntervalFileNode>
backupIntervalFiles, boolean needEmpty)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
LOGGER.info("The status of filenode processor {} switches from {} to {}.",
getProcessorName(),
- FileNodeProcessorStatus.MERGING_WRITE,
FileNodeProcessorStatus.WAITING);
+ FileNodeProcessorStatus.MERGING_WRITE,
FileNodeProcessorStatus.WAITING);
writeLock();
try {
oldMultiPassTokenSet = newMultiPassTokenSet;
@@ -1341,7 +1341,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
temp.overflowChangeType = OverflowChangeType.CHANGED;
} else {
changeTypeToChanged(deviceId, newFile.getStartTime(deviceId),
- newFile.getEndTime(deviceId));
+ newFile.getEndTime(deviceId));
}
}
}
@@ -1376,11 +1376,11 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
writeStoreToDisk(fileNodeProcessorStore);
} catch (FileNodeProcessorException e) {
LOGGER.error(
- "Merge: failed to write filenode information to revocery
file, the filenode is {}.",
- getProcessorName(), e);
+ "Merge: failed to write filenode information to revocery file,
the filenode is {}.",
+ getProcessorName(), e);
throw new FileNodeProcessorException(
- "Merge: write filenode information to revocery file failed,
the filenode is "
- + getProcessorName());
+ "Merge: write filenode information to revocery file failed, the
filenode is "
+ + getProcessorName());
}
}
} finally {
@@ -1389,15 +1389,15 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
private void switchWaitingToWorkingv2(List<IntervalFileNode>
backupIntervalFiles)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
LOGGER.info("The status of filenode processor {} switches from {} to {}.",
getProcessorName(),
- FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
+ FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
if (oldMultiPassLock != null) {
LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple
Pass Lock is {}",
- oldMultiPassTokenSet,
- oldMultiPassLock);
+ oldMultiPassTokenSet,
+ oldMultiPassLock);
oldMultiPassLock.writeLock().lock();
}
try {
@@ -1410,7 +1410,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
List<File> bufferwriteDirList = new ArrayList<>();
for (String bufferwriteDirPath : bufferwriteDirPathList) {
if (bufferwriteDirPath.length() > 0
- && bufferwriteDirPath.charAt(bufferwriteDirPath.length() -
1) != File.separatorChar) {
+ && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) !=
File.separatorChar) {
bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
}
bufferwriteDirPath = bufferwriteDirPath + getProcessorName();
@@ -1465,10 +1465,10 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
} catch (IOException e) {
LOGGER.info(
- "The filenode processor {} encountered an error when its "
- + "status switched from {} to {}.",
- getProcessorName(), FileNodeProcessorStatus.NONE,
FileNodeProcessorStatus.MERGING_WRITE,
- e);
+ "The filenode processor {} encountered an error when its "
+ + "status switched from {} to {}.",
+ getProcessorName(), FileNodeProcessorStatus.NONE,
FileNodeProcessorStatus.MERGING_WRITE,
+ e);
throw new FileNodeProcessorException(e);
} finally {
writeUnlock();
@@ -1484,15 +1484,15 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
private TSRecord constructTsRecord(TimeValuePair timeValuePair, String
deviceId,
- String measurementId) {
+ String measurementId) {
TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deviceId);
record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(),
measurementId,
- timeValuePair.getValue().getValue().toString()));
+ timeValuePair.getValue().getValue().toString()));
return record;
}
private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile)
- throws IOException, WriteProcessException,
FileNodeProcessorException, PathErrorException {
+ throws IOException, WriteProcessException, FileNodeProcessorException,
PathErrorException {
Map<String, Long> startTimeMap = new HashMap<>();
Map<String, Long> endTimeMap = new HashMap<>();
@@ -1505,15 +1505,15 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// losing some modification.
mergeDeleteLock.lock();
QueryContext context = new QueryContext();
- try{
- for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
- // query one deviceId
- List<Path> pathList = new ArrayList<>();
- boolean isRowGroupHasData = false;
- ChunkGroupFooter footer = null;
- int numOfChunk = 0;
- long startPos = -1;
- int recordCount = 0;
+ try {
+ for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
+ // query one deviceId
+ List<Path> pathList = new ArrayList<>();
+ boolean isRowGroupHasData = false;
+ ChunkGroupFooter footer = null;
+ int numOfChunk = 0;
+ long startPos = -1;
+ int recordCount = 0;
try {
List<String> pathStrings =
mManager.getLeafNodePathInNextLevel(deviceId);
for (String string : pathStrings) {
@@ -1532,31 +1532,31 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
String measurementId = path.getMeasurement();
TSDataType dataType = mManager.getSeriesType(path.getFullPath());
OverflowSeriesDataSource overflowSeriesDataSource =
overflowProcessor.queryMerge(deviceId,
- measurementId, dataType, true, context);
+ measurementId, dataType, true, context);
Filter timeFilter = FilterFactory
-
.and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
-
TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
+ .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
+ TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new
SingleSeriesExpression(path, timeFilter);
IReader seriesReader = SeriesReaderFactory.getInstance()
- .createSeriesReaderForMerge(backupIntervalFile,
- overflowSeriesDataSource, seriesFilter, context);
+ .createSeriesReaderForMerge(backupIntervalFile,
+ overflowSeriesDataSource, seriesFilter, context);
try {
if (!seriesReader.hasNext()) {
LOGGER.debug(
- "The time-series {} has no data with the filter {} in
the filenode processor {}",
- path, seriesFilter, getProcessorName());
+ "The time-series {} has no data with the filter {} in the
filenode processor {}",
+ path, seriesFilter, getProcessorName());
} else {
numOfChunk++;
TimeValuePair timeValuePair = seriesReader.next();
if (fileIoWriter == null) {
baseDir = directories.getNextFolderForTsfile();
fileName = String.valueOf(timeValuePair.getTimestamp()
- + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR +
System.currentTimeMillis());
+ + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR +
System.currentTimeMillis());
outputPath = constructOutputFilePath(baseDir,
getProcessorName(), fileName);
fileName = getProcessorName() + File.separatorChar + fileName;
fileIoWriter = new TsFileIOWriter(new File(outputPath));
mergingModification = new ModificationFile(outputPath
- + ModificationFile.FILE_SUFFIX);
+ + ModificationFile.FILE_SUFFIX);
mergeDeleteLock.unlock();
}
if (!isRowGroupHasData) {
@@ -1572,11 +1572,11 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
ChunkBuffer pageWriter = new ChunkBuffer(measurementSchema);
int pageSizeThreshold = TsFileConf.pageSizeInByte;
ChunkWriterImpl seriesWriterImpl = new
ChunkWriterImpl(measurementSchema, pageWriter,
- pageSizeThreshold);
+ pageSizeThreshold);
// write the series data
recordCount += writeOneSeries(deviceId, measurementId,
seriesWriterImpl, dataType,
- seriesReader,
- startTimeMap, endTimeMap, timeValuePair);
+ seriesReader,
+ startTimeMap, endTimeMap, timeValuePair);
// flush the series data
seriesWriterImpl.writeToFileWriter(fileIoWriter);
}
@@ -1594,8 +1594,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
} finally {
- if (mergeDeleteLock.isLocked())
+ if (mergeDeleteLock.isLocked()) {
mergeDeleteLock.unlock();
+ }
}
if (fileIoWriter != null) {
@@ -1612,9 +1613,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
private int writeOneSeries(String deviceId, String measurement,
ChunkWriterImpl seriesWriterImpl,
- TSDataType dataType, IReader seriesReader,
Map<String, Long> startTimeMap,
- Map<String, Long> endTimeMap,
- TimeValuePair timeValuePair) throws IOException {
+ TSDataType dataType, IReader seriesReader, Map<String, Long>
startTimeMap,
+ Map<String, Long> endTimeMap,
+ TimeValuePair timeValuePair) throws IOException {
int count = 0;
long startTime = -1;
long endTime = -1;
@@ -1634,7 +1635,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBoolean());
+ .write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBoolean());
}
if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) <
endTime) {
endTimeMap.put(deviceId, endTime);
@@ -1715,7 +1716,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getDouble());
+ .write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getDouble());
}
if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) <
endTime) {
endTimeMap.put(deviceId, endTime);
@@ -1736,7 +1737,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBinary());
+ .write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBinary());
}
if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) <
endTime) {
endTimeMap.put(deviceId, endTime);
@@ -1757,7 +1758,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
File dataDir = new File(baseDir);
if (!dataDir.exists()) {
LOGGER.warn("The bufferwrite processor data dir doesn't exists, create
new directory {}",
- baseDir);
+ baseDir);
dataDir.mkdirs();
}
File outputFile = new File(dataDir, fileName);
@@ -1781,7 +1782,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
private FileSchema getFileSchemaFromColumnSchema(List<ColumnSchema>
schemaList, String deviceType)
- throws WriteProcessException {
+ throws WriteProcessException {
JSONArray rowGroup = new JSONArray();
for (ColumnSchema col : schemaList) {
@@ -1819,8 +1820,8 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
} else {
LOGGER
- .info("The filenode {} can't be closed, because it can't
get oldMultiPassLock {}",
- getProcessorName(), oldMultiPassLock);
+ .info("The filenode {} can't be closed, because it can't get
oldMultiPassLock {}",
+ getProcessorName(), oldMultiPassLock);
return false;
}
} else {
@@ -1831,13 +1832,13 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
} else {
LOGGER.info("The filenode {} can't be closed, because it can't get
newMultiPassLock {}",
- getProcessorName(), newMultiPassLock);
+ getProcessorName(), newMultiPassLock);
return false;
}
} else {
LOGGER.info("The filenode {} can't be closed, because the filenode
status is {}",
- getProcessorName(),
- isMerging);
+ getProcessorName(),
+ isMerging);
return false;
}
}
@@ -1862,7 +1863,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
while (!bufferWriteProcessor.canBeClosed()) {
try {
LOGGER.info("The bufferwrite {} can't be closed, wait 100ms",
- bufferWriteProcessor.getProcessorName());
+ bufferWriteProcessor.getProcessorName());
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
@@ -1887,7 +1888,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
while (!overflowProcessor.canBeClosed()) {
try {
LOGGER.info("The overflow {} can't be closed, wait 100ms",
- overflowProcessor.getProcessorName());
+ overflowProcessor.getProcessorName());
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
// ignore the interrupted exception
@@ -1954,14 +1955,14 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
private void writeStoreToDisk(FileNodeProcessorStore fileNodeProcessorStore)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
synchronized (fileNodeRestoreFilePath) {
SerializeUtil<FileNodeProcessorStore> serializeUtil = new
SerializeUtil<>();
try {
serializeUtil.serialize(fileNodeProcessorStore,
fileNodeRestoreFilePath);
LOGGER.debug("The filenode processor {} writes restore information to
the restore file",
- getProcessorName());
+ getProcessorName());
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
@@ -1992,9 +1993,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
/**
* Delete data whose timestamp <= 'timestamp' and belong to timeseries
deviceId.measurementId.
*
- * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
+ * @param timestamp the delete range is (0, timestamp].
*/
public void delete(String deviceId, String measurementId, long timestamp)
throws IOException {
// TODO: how to avoid partial deletion?
@@ -2006,7 +2007,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
try {
String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
Deletion deletion = new Deletion(fullPath, version, timestamp);
if (mergingModification != null) {
mergingModification.write(deletion);
@@ -2031,7 +2032,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
- List<ModificationFile> updatedModFiles)
throws IOException {
+ List<ModificationFile> updatedModFiles) throws IOException {
if (currentIntervalFileNode != null &&
currentIntervalFileNode.containsDevice(deviceId)) {
currentIntervalFileNode.getModFile().write(deletion);
updatedModFiles.add(currentIntervalFileNode.getModFile());
@@ -2046,13 +2047,12 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
/**
- * Similar to delete(), but only deletes data in BufferWrite.
- * Only used by WAL recovery.
+ * Similar to delete(), but only deletes data in BufferWrite. Only used by
WAL recovery.
*/
public void deleteBufferWrite(String deviceId, String measurementId, long
timestamp)
- throws IOException {
+ throws IOException {
String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
long version = versionController.nextVersion();
Deletion deletion = new Deletion(fullPath, version, timestamp);
@@ -2071,11 +2071,10 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
/**
- * Similar to delete(), but only deletes data in Overflow.
- * Only used by WAL recovery.
+ * Similar to delete(), but only deletes data in Overflow. Only used by WAL
recovery.
*/
public void deleteOverflow(String deviceId, String measurementId, long
timestamp)
- throws IOException {
+ throws IOException {
long version = versionController.nextVersion();
OverflowProcessor overflowProcessor =
getOverflowProcessor(getProcessorName());