This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch debug_apply_stuck
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/debug_apply_stuck by this push:
new 055b4b8 add wal related logs
055b4b8 is described below
commit 055b4b823cde8fd255f578870f8f17edc318c415
Author: jt <[email protected]>
AuthorDate: Mon Apr 26 11:13:39 2021 +0800
add wal related logs
---
.../engine/storagegroup/StorageGroupProcessor.java | 176 +++++++++++++--------
1 file changed, 111 insertions(+), 65 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d828ef8..35cd2a2 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -150,9 +150,13 @@ public class StorageGroupProcessor {
private static final int MERGE_MOD_START_VERSION_NUM = 1;
private static final Logger logger =
LoggerFactory.getLogger(StorageGroupProcessor.class);
- /** indicating the file to be loaded already exists locally. */
+ /**
+ * indicating the file to be loaded already exists locally.
+ */
private static final int POS_ALREADY_EXIST = -2;
- /** indicating the file to be loaded overlap with some files. */
+ /**
+ * indicating the file to be loaded overlap with some files.
+ */
private static final int POS_OVERLAP = -3;
private final boolean enableMemControl = config.isEnableMemControl();
@@ -163,17 +167,25 @@ public class StorageGroupProcessor {
* partitionLatestFlushedTimeForEachDevice)
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
- /** closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done. */
+ /**
+ * closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done.
+ */
private final Object closeStorageGroupCondition = new Object();
/**
* avoid some tsfileResource is changed (e.g., from unsealed to sealed) when
a query is executed.
*/
private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
- /** time partition id in the storage group -> tsFileProcessor for this time
partition */
+ /**
+ * time partition id in the storage group -> tsFileProcessor for this time
partition
+ */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors =
new TreeMap<>();
- /** time partition id in the storage group -> tsFileProcessor for this time
partition */
+ /**
+ * time partition id in the storage group -> tsFileProcessor for this time
partition
+ */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors
= new TreeMap<>();
- /** compactionMergeWorking is used to wait for last compaction to be done. */
+ /**
+ * compactionMergeWorking is used to wait for last compaction to be done.
+ */
private volatile boolean compactionMergeWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -204,14 +216,15 @@ public class StorageGroupProcessor {
*/
private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice
= new HashMap<>();
- /** used to record the latest flush time while upgrading and inserting */
+ /**
+ * used to record the latest flush time while upgrading and inserting
+ */
private Map<Long, Map<String, Long>>
newlyFlushedPartitionLatestFlushedTimeForEachDevice =
new HashMap<>();
/**
* global mapping of device -> largest timestamp of the latest memtable to *
be submitted to
* asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to
maintain global
- * latestFlushedTime of devices and will be updated along with
- * partitionLatestFlushedTimeForEachDevice
+ * latestFlushedTime of devices and will be updated along with
partitionLatestFlushedTimeForEachDevice
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new
HashMap<>();
@@ -271,7 +284,9 @@ public class StorageGroupProcessor {
// DEFAULT_POOL_TRIM_INTERVAL_MILLIS
private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
- /** get the direct byte buffer from pool, each fetch contains two ByteBuffer
*/
+ /**
+ * get the direct byte buffer from pool, each fetch contains two ByteBuffer
+ */
public ByteBuffer[] getWalDirectByteBuffer() {
ByteBuffer[] res = new ByteBuffer[2];
synchronized (walByteBufferPool) {
@@ -310,10 +325,15 @@ public class StorageGroupProcessor {
timeWhenPoolNotEmpty = Long.MAX_VALUE;
}
}
+ logger
+ .info("Thread {} retrieves a WAL buffer {}, pool size: {}/{}",
Thread.currentThread(), res,
+ walByteBufferPool.size(), currentWalPoolSize);
return res;
}
- /** put the byteBuffer back to pool */
+ /**
+ * put the byteBuffer back to pool
+ */
public void releaseWalBuffer(ByteBuffer[] byteBuffers) {
for (ByteBuffer byteBuffer : byteBuffers) {
byteBuffer.clear();
@@ -326,10 +346,14 @@ public class StorageGroupProcessor {
walByteBufferPool.addLast(byteBuffers[0]);
walByteBufferPool.addLast(byteBuffers[1]);
walByteBufferPool.notifyAll();
+ logger.info("Thread {} releases a WAL buffer {}, pool size: {}/{}",
Thread.currentThread(),
+ byteBuffers, walByteBufferPool.size(), currentWalPoolSize);
}
}
- /** trim the size of the pool and release the memory of needless direct byte
buffer */
+ /**
+ * trim the size of the pool and release the memory of needless direct byte
buffer
+ */
private void trimTask() {
synchronized (walByteBufferPool) {
int expectedSize =
@@ -346,6 +370,8 @@ public class StorageGroupProcessor {
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
currentWalPoolSize -= 2;
+ logger.info("Thread {} trims a WAL buffer , pool size: {}/{}",
Thread.currentThread(),
+ walByteBufferPool.size(), currentWalPoolSize);
}
}
}
@@ -353,9 +379,9 @@ public class StorageGroupProcessor {
/**
* constrcut a storage group processor
*
- * @param systemDir system dir path
- * @param virtualStorageGroupId virtual storage group id e.g. 1
- * @param fileFlushPolicy file flush policy
+ * @param systemDir system dir path
+ * @param virtualStorageGroupId virtual storage group id e.g. 1
+ * @param fileFlushPolicy file flush policy
* @param logicalStorageGroupName logical storage group name e.g. root.sg1
*/
public StorageGroupProcessor(
@@ -803,8 +829,8 @@ public class StorageGroupProcessor {
boolean isSequence =
insertRowPlan.getTime()
> partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
+ .get(timePartitionId)
+ .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
// is unsequence and user set config to discard out of order data
if (!isSequence
@@ -898,7 +924,7 @@ public class StorageGroupProcessor {
||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
+ insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
// re initialize
@@ -919,7 +945,7 @@ public class StorageGroupProcessor {
if
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletPlan, before, loc, false, results,
beforeTimePartition)
+ insertTabletPlan, before, loc, false, results,
beforeTimePartition)
&& noFailure;
}
before = loc;
@@ -932,10 +958,10 @@ public class StorageGroupProcessor {
// do not forget last part
if (before < loc
&& (isSequence
- ||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+ ||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
+ insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
long globalLatestFlushedTime =
@@ -954,7 +980,9 @@ public class StorageGroupProcessor {
}
}
- /** @return whether the given time falls in ttl */
+ /**
+ * @return whether the given time falls in ttl
+ */
private boolean isAlive(long time) {
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <=
dataTTL;
}
@@ -964,11 +992,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in
insertTabletPlan
+ * @param end end index of rows to be inserted in
insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(
@@ -1008,9 +1036,9 @@ public class StorageGroupProcessor {
// try to update the latest time of the device of this tsRecord
if (sequence
&& latestTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
- < insertTabletPlan.getTimes()[end - 1]) {
+ .get(timePartitionId)
+ .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
+ < insertTabletPlan.getTimes()[end - 1]) {
latestTimeForEachDevice
.get(timePartitionId)
.put(insertTabletPlan.getDeviceId().getFullPath(),
insertTabletPlan.getTimes()[end - 1]);
@@ -1062,8 +1090,8 @@ public class StorageGroupProcessor {
// try to update the latest time of the device of this tsRecord
if (latestTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
+ .get(timePartitionId)
+ .getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE)
< insertRowPlan.getTime()) {
latestTimeForEachDevice
.get(timePartitionId)
@@ -1146,9 +1174,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(
long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
boolean sequence)
@@ -1285,7 +1313,9 @@ public class StorageGroupProcessor {
}
}
- /** thread-safety should be ensured by caller */
+ /**
+ * thread-safety should be ensured by caller
+ */
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
tsFileProcessor) {
// for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
// for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
@@ -1323,7 +1353,9 @@ public class StorageGroupProcessor {
}
}
- /** delete the storageGroup's own folder in folder
data/system/storage_groups */
+ /**
+ * delete the storageGroup's own folder in folder data/system/storage_groups
+ */
public void deleteFolder(String systemDir) {
logger.info(
"{} will close all files for deleting data folder {}",
@@ -1414,7 +1446,9 @@ public class StorageGroupProcessor {
}
}
- /** Iterate each TsFile and try to lock and remove those out of TTL. */
+ /**
+ * Iterate each TsFile and try to lock and remove those out of TTL.
+ */
public synchronized void checkFilesTTL() {
if (dataTTL == Long.MAX_VALUE) {
logger.debug(
@@ -1481,7 +1515,9 @@ public class StorageGroupProcessor {
}
}
- /** This method will be blocked until all tsfile processors are closed. */
+ /**
+ * This method will be blocked until all tsfile processors are closed.
+ */
public void syncCloseAllWorkingTsFileProcessors() {
synchronized (closeStorageGroupCondition) {
try {
@@ -1681,9 +1717,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param path the timeseries path of the to be deleted.
+ * @param path the timeseries path of the to be deleted.
* @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(PartialPath path, long startTime, long endTime, long
planIndex)
throws IOException {
@@ -1894,7 +1930,7 @@ public class StorageGroupProcessor {
/**
* update latest flush time for partition id
*
- * @param partitionId partition id
+ * @param partitionId partition id
* @param latestFlushTime lastest flush time
* @return true if update latest flush time success
*/
@@ -1929,7 +1965,9 @@ public class StorageGroupProcessor {
return true;
}
- /** used for upgrading */
+ /**
+ * used for upgrading
+ */
public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
long partitionId, String deviceId, long time) {
newlyFlushedPartitionLatestFlushedTimeForEachDevice
@@ -1937,7 +1975,9 @@ public class StorageGroupProcessor {
.compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
- /** put the memtable back to the MemTablePool and make the metadata in
writer visible */
+ /**
+ * put the memtable back to the MemTablePool and make the metadata in writer
visible
+ */
// TODO please consider concurrency with query and insert method.
private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor
tsFileProcessor)
throws TsFileProcessorException {
@@ -1979,7 +2019,7 @@ public class StorageGroupProcessor {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
tsFileManagement
- .new CompactionMergeTask(this::closeCompactionMergeCallBack,
timePartition));
+ .new
CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
} catch (IOException | RejectedExecutionException e) {
this.closeCompactionMergeCallBack();
logger.error(
@@ -1994,7 +2034,9 @@ public class StorageGroupProcessor {
}
}
- /** close compaction merge callback, to release some locks */
+ /**
+ * close compaction merge callback, to release some locks
+ */
private void closeCompactionMergeCallBack() {
this.compactionMergeWorking = false;
}
@@ -2276,11 +2318,12 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into them.
+ * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into
+ * them.
*
* @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one
to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file an insertion
position i >= -1 if the new
- * file can be inserted between [i, i+1]
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position
i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(
TsFileResource newTsFileResource,
@@ -2331,7 +2374,7 @@ public class StorageGroupProcessor {
* Compare each device in the two files to find the time relation of them.
*
* @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is
partially older than
- * fileB and partially newer than fileB (A X B) 1 if fileA is totally
newer than fileB (B < A)
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer
than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB)
{
boolean hasPre = false, hasSubsequence = false;
@@ -2452,7 +2495,8 @@ public class StorageGroupProcessor {
* <p>The sorting rules for tsfile names @see {@link this#compareFileName},
we can restore the
* list based on the file name and ensure the correctness of the order, so
there are three cases.
*
- * <p>1. The tsfile is to be inserted in the first place of the list. If the
timestamp in the file
+ * <p>1. The tsfile is to be inserted in the first place of the list. If the
timestamp in the
+ * file
* name is less than the timestamp in the file name of the first tsfile in
the list, then the file
* name is legal and the file name is returned directly. Otherwise, its
timestamp can be set to
* half of the timestamp value in the file name of the first tsfile in the
list , and the version
@@ -2468,9 +2512,9 @@ public class StorageGroupProcessor {
* name and returns directly; otherwise, the time stamp is the mean of the
timestamps of the two
* files, the version number is the version number in the tsfile with a
larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files
[insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(
@@ -2503,16 +2547,16 @@ public class StorageGroupProcessor {
}
/**
- * Update latest time in latestTimeForEachDevice and
- * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
+ * Update latest time in latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
+ * @UsedBy sync module, load external tsfile module.
*/
private void updateLatestTimeMap(TsFileResource newTsFileResource) {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = StorageEngine.getTimePartition(endTime);
if (!latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .containsKey(device)
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .containsKey(device)
|| latestTimeForEachDevice.get(timePartitionId).get(device) <
endTime) {
latestTimeForEachDevice.get(timePartitionId).put(device, endTime);
}
@@ -2534,8 +2578,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully @UsedBy sync module, load external
tsfile module.
*/
@@ -2643,7 +2687,7 @@ public class StorageGroupProcessor {
*
* @param tsfieToBeDeleted tsfile to be deleted
* @return whether the file to be deleted exists. @UsedBy sync module, load
external tsfile
- * module.
+ * module.
*/
public boolean deleteTsfile(File tsfieToBeDeleted) {
tsFileManagement.writeLock();
@@ -2777,14 +2821,14 @@ public class StorageGroupProcessor {
* "tsFileResource" have the same plan indexes as the local one.
*
* @return true if any file contains plans with indexes no less than the max
plan index of
- * "tsFileResource", otherwise false.
+ * "tsFileResource", otherwise false.
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
// examine working processor first as they have the largest plan index
return isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
|| isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getSequenceFileTreeSet())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getUnSequenceFileList());
}
@@ -2832,7 +2876,9 @@ public class StorageGroupProcessor {
return false;
}
- /** remove all partitions that satisfy a filter. */
+ /**
+ * remove all partitions that satisfy a filter.
+ */
public void removePartitions(TimePartitionFilter filter) {
// this requires blocking all other activities
tsFileManagement.writeLock();
@@ -2915,8 +2961,8 @@ public class StorageGroupProcessor {
isSequence =
plan.getTime()
> partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .getOrDefault(plan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
+ .get(timePartitionId)
+ .getOrDefault(plan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
}
// is unsequence and user set config to discard out of order data
if (!isSequence