This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch debug_apply_stuck
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/debug_apply_stuck by this push:
new 56ef1bf apply spotless
56ef1bf is described below
commit 56ef1bfd3d76f596a692f8e1d437cbd2222e0e51
Author: jt <[email protected]>
AuthorDate: Mon Apr 26 11:21:38 2021 +0800
apply spotless
---
.../engine/storagegroup/StorageGroupProcessor.java | 193 +++++++++------------
1 file changed, 82 insertions(+), 111 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 35cd2a2..053c8a3 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -150,13 +150,9 @@ public class StorageGroupProcessor {
private static final int MERGE_MOD_START_VERSION_NUM = 1;
private static final Logger logger =
LoggerFactory.getLogger(StorageGroupProcessor.class);
- /**
- * indicating the file to be loaded already exists locally.
- */
+ /** indicating the file to be loaded already exists locally. */
private static final int POS_ALREADY_EXIST = -2;
- /**
- * indicating the file to be loaded overlap with some files.
- */
+ /** indicating the file to be loaded overlap with some files. */
private static final int POS_OVERLAP = -3;
private final boolean enableMemControl = config.isEnableMemControl();
@@ -167,25 +163,17 @@ public class StorageGroupProcessor {
* partitionLatestFlushedTimeForEachDevice)
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
- /**
- * closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done.
- */
+ /** closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done. */
private final Object closeStorageGroupCondition = new Object();
/**
* avoid some tsfileResource is changed (e.g., from unsealed to sealed) when
a query is executed.
*/
private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
- /**
- * time partition id in the storage group -> tsFileProcessor for this time
partition
- */
+ /** time partition id in the storage group -> tsFileProcessor for this time
partition */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors =
new TreeMap<>();
- /**
- * time partition id in the storage group -> tsFileProcessor for this time
partition
- */
+ /** time partition id in the storage group -> tsFileProcessor for this time
partition */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors
= new TreeMap<>();
- /**
- * compactionMergeWorking is used to wait for last compaction to be done.
- */
+ /** compactionMergeWorking is used to wait for last compaction to be done. */
private volatile boolean compactionMergeWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -216,15 +204,14 @@ public class StorageGroupProcessor {
*/
private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice
= new HashMap<>();
- /**
- * used to record the latest flush time while upgrading and inserting
- */
+ /** used to record the latest flush time while upgrading and inserting */
private Map<Long, Map<String, Long>>
newlyFlushedPartitionLatestFlushedTimeForEachDevice =
new HashMap<>();
/**
* global mapping of device -> largest timestamp of the latest memtable to *
be submitted to
* asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to
maintain global
- * latestFlushedTime of devices and will be updated along with
partitionLatestFlushedTimeForEachDevice
+ * latestFlushedTime of devices and will be updated along with
+ * partitionLatestFlushedTimeForEachDevice
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new
HashMap<>();
@@ -284,9 +271,7 @@ public class StorageGroupProcessor {
// DEFAULT_POOL_TRIM_INTERVAL_MILLIS
private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
- /**
- * get the direct byte buffer from pool, each fetch contains two ByteBuffer
- */
+ /** get the direct byte buffer from pool, each fetch contains two ByteBuffer
*/
public ByteBuffer[] getWalDirectByteBuffer() {
ByteBuffer[] res = new ByteBuffer[2];
synchronized (walByteBufferPool) {
@@ -325,15 +310,16 @@ public class StorageGroupProcessor {
timeWhenPoolNotEmpty = Long.MAX_VALUE;
}
}
- logger
- .info("Thread {} retrieves a WAL buffer {}, pool size: {}/{}",
Thread.currentThread(), res,
- walByteBufferPool.size(), currentWalPoolSize);
+ logger.info(
+ "Thread {} retrieves a WAL buffer {}, pool size: {}/{}",
+ Thread.currentThread(),
+ res,
+ walByteBufferPool.size(),
+ currentWalPoolSize);
return res;
}
- /**
- * put the byteBuffer back to pool
- */
+ /** put the byteBuffer back to pool */
public void releaseWalBuffer(ByteBuffer[] byteBuffers) {
for (ByteBuffer byteBuffer : byteBuffers) {
byteBuffer.clear();
@@ -346,14 +332,16 @@ public class StorageGroupProcessor {
walByteBufferPool.addLast(byteBuffers[0]);
walByteBufferPool.addLast(byteBuffers[1]);
walByteBufferPool.notifyAll();
- logger.info("Thread {} releases a WAL buffer {}, pool size: {}/{}",
Thread.currentThread(),
- byteBuffers, walByteBufferPool.size(), currentWalPoolSize);
+ logger.info(
+ "Thread {} releases a WAL buffer {}, pool size: {}/{}",
+ Thread.currentThread(),
+ byteBuffers,
+ walByteBufferPool.size(),
+ currentWalPoolSize);
}
}
- /**
- * trim the size of the pool and release the memory of needless direct byte
buffer
- */
+ /** trim the size of the pool and release the memory of needless direct byte
buffer */
private void trimTask() {
synchronized (walByteBufferPool) {
int expectedSize =
@@ -370,8 +358,11 @@ public class StorageGroupProcessor {
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
currentWalPoolSize -= 2;
- logger.info("Thread {} trims a WAL buffer , pool size: {}/{}",
Thread.currentThread(),
- walByteBufferPool.size(), currentWalPoolSize);
+ logger.info(
+ "Thread {} trims a WAL buffer , pool size: {}/{}",
+ Thread.currentThread(),
+ walByteBufferPool.size(),
+ currentWalPoolSize);
}
}
}
@@ -379,9 +370,9 @@ public class StorageGroupProcessor {
/**
* constrcut a storage group processor
*
- * @param systemDir system dir path
- * @param virtualStorageGroupId virtual storage group id e.g. 1
- * @param fileFlushPolicy file flush policy
+ * @param systemDir system dir path
+ * @param virtualStorageGroupId virtual storage group id e.g. 1
+ * @param fileFlushPolicy file flush policy
* @param logicalStorageGroupName logical storage group name e.g. root.sg1
*/
public StorageGroupProcessor(
@@ -829,8 +820,8 @@ public class StorageGroupProcessor {
boolean isSequence =
insertRowPlan.getTime()
> partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
+ .get(timePartitionId)
+ .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
// is unsequence and user set config to discard out of order data
if (!isSequence
@@ -924,7 +915,7 @@ public class StorageGroupProcessor {
||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
+ insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
// re initialize
@@ -945,7 +936,7 @@ public class StorageGroupProcessor {
if
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletPlan, before, loc, false, results,
beforeTimePartition)
+ insertTabletPlan, before, loc, false, results,
beforeTimePartition)
&& noFailure;
}
before = loc;
@@ -958,10 +949,10 @@ public class StorageGroupProcessor {
// do not forget last part
if (before < loc
&& (isSequence
- ||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+ ||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
+ insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
long globalLatestFlushedTime =
@@ -980,9 +971,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * @return whether the given time falls in ttl
- */
+ /** @return whether the given time falls in ttl */
private boolean isAlive(long time) {
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <=
dataTTL;
}
@@ -992,11 +981,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in
insertTabletPlan
- * @param end end index of rows to be inserted in
insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(
@@ -1036,9 +1025,9 @@ public class StorageGroupProcessor {
// try to update the latest time of the device of this tsRecord
if (sequence
&& latestTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
- < insertTabletPlan.getTimes()[end - 1]) {
+ .get(timePartitionId)
+ .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
+ < insertTabletPlan.getTimes()[end - 1]) {
latestTimeForEachDevice
.get(timePartitionId)
.put(insertTabletPlan.getDeviceId().getFullPath(),
insertTabletPlan.getTimes()[end - 1]);
@@ -1090,8 +1079,8 @@ public class StorageGroupProcessor {
// try to update the latest time of the device of this tsRecord
if (latestTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
+ .get(timePartitionId)
+ .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
< insertRowPlan.getTime()) {
latestTimeForEachDevice
.get(timePartitionId)
@@ -1174,9 +1163,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(
long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
boolean sequence)
@@ -1313,9 +1302,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * thread-safety should be ensured by caller
- */
+ /** thread-safety should be ensured by caller */
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
tsFileProcessor) {
// for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
// for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
@@ -1353,9 +1340,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * delete the storageGroup's own folder in folder data/system/storage_groups
- */
+ /** delete the storageGroup's own folder in folder
data/system/storage_groups */
public void deleteFolder(String systemDir) {
logger.info(
"{} will close all files for deleting data folder {}",
@@ -1446,9 +1431,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * Iterate each TsFile and try to lock and remove those out of TTL.
- */
+ /** Iterate each TsFile and try to lock and remove those out of TTL. */
public synchronized void checkFilesTTL() {
if (dataTTL == Long.MAX_VALUE) {
logger.debug(
@@ -1515,9 +1498,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * This method will be blocked until all tsfile processors are closed.
- */
+ /** This method will be blocked until all tsfile processors are closed. */
public void syncCloseAllWorkingTsFileProcessors() {
synchronized (closeStorageGroupCondition) {
try {
@@ -1717,9 +1698,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param path the timeseries path of the to be deleted.
+ * @param path the timeseries path of the to be deleted.
* @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(PartialPath path, long startTime, long endTime, long
planIndex)
throws IOException {
@@ -1930,7 +1911,7 @@ public class StorageGroupProcessor {
/**
* update latest flush time for partition id
*
- * @param partitionId partition id
+ * @param partitionId partition id
* @param latestFlushTime lastest flush time
* @return true if update latest flush time success
*/
@@ -1965,9 +1946,7 @@ public class StorageGroupProcessor {
return true;
}
- /**
- * used for upgrading
- */
+ /** used for upgrading */
public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
long partitionId, String deviceId, long time) {
newlyFlushedPartitionLatestFlushedTimeForEachDevice
@@ -1975,9 +1954,7 @@ public class StorageGroupProcessor {
.compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
- /**
- * put the memtable back to the MemTablePool and make the metadata in writer
visible
- */
+ /** put the memtable back to the MemTablePool and make the metadata in
writer visible */
// TODO please consider concurrency with query and insert method.
private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor
tsFileProcessor)
throws TsFileProcessorException {
@@ -2019,7 +1996,7 @@ public class StorageGroupProcessor {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
tsFileManagement
- .new
CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack,
timePartition));
} catch (IOException | RejectedExecutionException e) {
this.closeCompactionMergeCallBack();
logger.error(
@@ -2034,9 +2011,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * close compaction merge callback, to release some locks
- */
+ /** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack() {
this.compactionMergeWorking = false;
}
@@ -2318,12 +2293,11 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into
- * them.
+ * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into them.
*
* @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one
to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file an insertion position
i >= -1 if the new
- * file can be inserted between [i, i+1]
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion
position i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(
TsFileResource newTsFileResource,
@@ -2374,7 +2348,7 @@ public class StorageGroupProcessor {
* Compare each device in the two files to find the time relation of them.
*
* @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is
partially older than
- * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer
than fileB (B < A)
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally
newer than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB)
{
boolean hasPre = false, hasSubsequence = false;
@@ -2495,8 +2469,7 @@ public class StorageGroupProcessor {
* <p>The sorting rules for tsfile names @see {@link this#compareFileName},
we can restore the
* list based on the file name and ensure the correctness of the order, so
there are three cases.
*
- * <p>1. The tsfile is to be inserted in the first place of the list. If the
timestamp in the
- * file
+ * <p>1. The tsfile is to be inserted in the first place of the list. If the
timestamp in the file
* name is less than the timestamp in the file name of the first tsfile in
the list, then the file
* name is legal and the file name is returned directly. Otherwise, its
timestamp can be set to
* half of the timestamp value in the file name of the first tsfile in the
list , and the version
@@ -2512,9 +2485,9 @@ public class StorageGroupProcessor {
* name and returns directly; otherwise, the time stamp is the mean of the
timestamps of the two
* files, the version number is the version number in the tsfile with a
larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files
[insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(
@@ -2547,16 +2520,16 @@ public class StorageGroupProcessor {
}
/**
- * Update latest time in latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
- * @UsedBy sync module, load external tsfile module.
+ * Update latest time in latestTimeForEachDevice and
+ * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
*/
private void updateLatestTimeMap(TsFileResource newTsFileResource) {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = StorageEngine.getTimePartition(endTime);
if (!latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .containsKey(device)
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .containsKey(device)
|| latestTimeForEachDevice.get(timePartitionId).get(device) <
endTime) {
latestTimeForEachDevice.get(timePartitionId).put(device, endTime);
}
@@ -2578,8 +2551,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully @UsedBy sync module, load external
tsfile module.
*/
@@ -2687,7 +2660,7 @@ public class StorageGroupProcessor {
*
* @param tsfieToBeDeleted tsfile to be deleted
* @return whether the file to be deleted exists. @UsedBy sync module, load
external tsfile
- * module.
+ * module.
*/
public boolean deleteTsfile(File tsfieToBeDeleted) {
tsFileManagement.writeLock();
@@ -2821,14 +2794,14 @@ public class StorageGroupProcessor {
* "tsFileResource" have the same plan indexes as the local one.
*
* @return true if any file contains plans with indexes no less than the max
plan index of
- * "tsFileResource", otherwise false.
+ * "tsFileResource", otherwise false.
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
// examine working processor first as they have the largest plan index
return isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
|| isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getSequenceFileTreeSet())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getUnSequenceFileList());
}
@@ -2876,9 +2849,7 @@ public class StorageGroupProcessor {
return false;
}
- /**
- * remove all partitions that satisfy a filter.
- */
+ /** remove all partitions that satisfy a filter. */
public void removePartitions(TimePartitionFilter filter) {
// this requires blocking all other activities
tsFileManagement.writeLock();
@@ -2961,8 +2932,8 @@ public class StorageGroupProcessor {
isSequence =
plan.getTime()
> partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(plan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
+ .get(timePartitionId)
+ .getOrDefault(plan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
}
// is unsequence and user set config to discard out of order data
if (!isSequence