This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch debug_apply_stuck
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/debug_apply_stuck by this push:
     new 56ef1bf  apply spotless
56ef1bf is described below

commit 56ef1bfd3d76f596a692f8e1d437cbd2222e0e51
Author: jt <[email protected]>
AuthorDate: Mon Apr 26 11:21:38 2021 +0800

    apply spotless
---
 .../engine/storagegroup/StorageGroupProcessor.java | 193 +++++++++------------
 1 file changed, 82 insertions(+), 111 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 35cd2a2..053c8a3 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -150,13 +150,9 @@ public class StorageGroupProcessor {
   private static final int MERGE_MOD_START_VERSION_NUM = 1;
 
   private static final Logger logger = 
LoggerFactory.getLogger(StorageGroupProcessor.class);
-  /**
-   * indicating the file to be loaded already exists locally.
-   */
+  /** indicating the file to be loaded already exists locally. */
   private static final int POS_ALREADY_EXIST = -2;
-  /**
-   * indicating the file to be loaded overlap with some files.
-   */
+  /** indicating the file to be loaded overlap with some files. */
   private static final int POS_OVERLAP = -3;
 
   private final boolean enableMemControl = config.isEnableMemControl();
@@ -167,25 +163,17 @@ public class StorageGroupProcessor {
    * partitionLatestFlushedTimeForEachDevice)
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
-  /**
-   * closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done.
-   */
+  /** closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done. */
   private final Object closeStorageGroupCondition = new Object();
   /**
    * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when 
a query is executed.
    */
   private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
-  /**
-   * time partition id in the storage group -> tsFileProcessor for this time 
partition
-   */
+  /** time partition id in the storage group -> tsFileProcessor for this time 
partition */
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = 
new TreeMap<>();
-  /**
-   * time partition id in the storage group -> tsFileProcessor for this time 
partition
-   */
+  /** time partition id in the storage group -> tsFileProcessor for this time 
partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors 
= new TreeMap<>();
-  /**
-   * compactionMergeWorking is used to wait for last compaction to be done.
-   */
+  /** compactionMergeWorking is used to wait for last compaction to be done. */
   private volatile boolean compactionMergeWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -216,15 +204,14 @@ public class StorageGroupProcessor {
    */
   private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice 
= new HashMap<>();
 
-  /**
-   * used to record the latest flush time while upgrading and inserting
-   */
+  /** used to record the latest flush time while upgrading and inserting */
   private Map<Long, Map<String, Long>> 
newlyFlushedPartitionLatestFlushedTimeForEachDevice =
       new HashMap<>();
   /**
    * global mapping of device -> largest timestamp of the latest memtable to * 
be submitted to
    * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to 
maintain global
-   * latestFlushedTime of devices and will be updated along with 
partitionLatestFlushedTimeForEachDevice
+   * latestFlushedTime of devices and will be updated along with
+   * partitionLatestFlushedTimeForEachDevice
    */
   private Map<String, Long> globalLatestFlushedTimeForEachDevice = new 
HashMap<>();
 
@@ -284,9 +271,7 @@ public class StorageGroupProcessor {
   // DEFAULT_POOL_TRIM_INTERVAL_MILLIS
   private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
 
-  /**
-   * get the direct byte buffer from pool, each fetch contains two ByteBuffer
-   */
+  /** get the direct byte buffer from pool, each fetch contains two ByteBuffer 
*/
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
     synchronized (walByteBufferPool) {
@@ -325,15 +310,16 @@ public class StorageGroupProcessor {
         timeWhenPoolNotEmpty = Long.MAX_VALUE;
       }
     }
-    logger
-        .info("Thread {} retrieves a WAL buffer {}, pool size: {}/{}", 
Thread.currentThread(), res,
-            walByteBufferPool.size(), currentWalPoolSize);
+    logger.info(
+        "Thread {} retrieves a WAL buffer {}, pool size: {}/{}",
+        Thread.currentThread(),
+        res,
+        walByteBufferPool.size(),
+        currentWalPoolSize);
     return res;
   }
 
-  /**
-   * put the byteBuffer back to pool
-   */
+  /** put the byteBuffer back to pool */
   public void releaseWalBuffer(ByteBuffer[] byteBuffers) {
     for (ByteBuffer byteBuffer : byteBuffers) {
       byteBuffer.clear();
@@ -346,14 +332,16 @@ public class StorageGroupProcessor {
       walByteBufferPool.addLast(byteBuffers[0]);
       walByteBufferPool.addLast(byteBuffers[1]);
       walByteBufferPool.notifyAll();
-      logger.info("Thread {} releases a WAL buffer {}, pool size: {}/{}", 
Thread.currentThread(),
-          byteBuffers, walByteBufferPool.size(), currentWalPoolSize);
+      logger.info(
+          "Thread {} releases a WAL buffer {}, pool size: {}/{}",
+          Thread.currentThread(),
+          byteBuffers,
+          walByteBufferPool.size(),
+          currentWalPoolSize);
     }
   }
 
-  /**
-   * trim the size of the pool and release the memory of needless direct byte 
buffer
-   */
+  /** trim the size of the pool and release the memory of needless direct byte 
buffer */
   private void trimTask() {
     synchronized (walByteBufferPool) {
       int expectedSize =
@@ -370,8 +358,11 @@ public class StorageGroupProcessor {
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
         currentWalPoolSize -= 2;
-        logger.info("Thread {} trims a WAL buffer , pool size: {}/{}", 
Thread.currentThread(),
-           walByteBufferPool.size(), currentWalPoolSize);
+        logger.info(
+            "Thread {} trims a WAL buffer , pool size: {}/{}",
+            Thread.currentThread(),
+            walByteBufferPool.size(),
+            currentWalPoolSize);
       }
     }
   }
@@ -379,9 +370,9 @@ public class StorageGroupProcessor {
   /**
    * constrcut a storage group processor
    *
-   * @param systemDir               system dir path
-   * @param virtualStorageGroupId   virtual storage group id e.g. 1
-   * @param fileFlushPolicy         file flush policy
+   * @param systemDir system dir path
+   * @param virtualStorageGroupId virtual storage group id e.g. 1
+   * @param fileFlushPolicy file flush policy
    * @param logicalStorageGroupName logical storage group name e.g. root.sg1
    */
   public StorageGroupProcessor(
@@ -829,8 +820,8 @@ public class StorageGroupProcessor {
       boolean isSequence =
           insertRowPlan.getTime()
               > partitionLatestFlushedTimeForEachDevice
-              .get(timePartitionId)
-              .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
+                  .get(timePartitionId)
+                  .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
 
       // is unsequence and user set config to discard out of order data
       if (!isSequence
@@ -924,7 +915,7 @@ public class StorageGroupProcessor {
               || 
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
             noFailure =
                 insertTabletToTsFileProcessor(
-                    insertTabletPlan, before, loc, isSequence, results, 
beforeTimePartition)
+                        insertTabletPlan, before, loc, isSequence, results, 
beforeTimePartition)
                     && noFailure;
           }
           // re initialize
@@ -945,7 +936,7 @@ public class StorageGroupProcessor {
             if 
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
               noFailure =
                   insertTabletToTsFileProcessor(
-                      insertTabletPlan, before, loc, false, results, 
beforeTimePartition)
+                          insertTabletPlan, before, loc, false, results, 
beforeTimePartition)
                       && noFailure;
             }
             before = loc;
@@ -958,10 +949,10 @@ public class StorageGroupProcessor {
       // do not forget last part
       if (before < loc
           && (isSequence
-          || 
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+              || 
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
         noFailure =
             insertTabletToTsFileProcessor(
-                insertTabletPlan, before, loc, isSequence, results, 
beforeTimePartition)
+                    insertTabletPlan, before, loc, isSequence, results, 
beforeTimePartition)
                 && noFailure;
       }
       long globalLatestFlushedTime =
@@ -980,9 +971,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * @return whether the given time falls in ttl
-   */
+  /** @return whether the given time falls in ttl */
   private boolean isAlive(long time) {
     return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= 
dataTTL;
   }
@@ -992,11 +981,11 @@ public class StorageGroupProcessor {
    * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence         whether is sequence
-   * @param start            start index of rows to be inserted in 
insertTabletPlan
-   * @param end              end index of rows to be inserted in 
insertTabletPlan
-   * @param results          result array
-   * @param timePartitionId  time partition id
+   * @param sequence whether is sequence
+   * @param start start index of rows to be inserted in insertTabletPlan
+   * @param end end index of rows to be inserted in insertTabletPlan
+   * @param results result array
+   * @param timePartitionId time partition id
    * @return false if any failure occurs when inserting the tablet, true 
otherwise
    */
   private boolean insertTabletToTsFileProcessor(
@@ -1036,9 +1025,9 @@ public class StorageGroupProcessor {
     // try to update the latest time of the device of this tsRecord
     if (sequence
         && latestTimeForEachDevice
-        .get(timePartitionId)
-        .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE)
-        < insertTabletPlan.getTimes()[end - 1]) {
+                .get(timePartitionId)
+                .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE)
+            < insertTabletPlan.getTimes()[end - 1]) {
       latestTimeForEachDevice
           .get(timePartitionId)
           .put(insertTabletPlan.getDeviceId().getFullPath(), 
insertTabletPlan.getTimes()[end - 1]);
@@ -1090,8 +1079,8 @@ public class StorageGroupProcessor {
 
     // try to update the latest time of the device of this tsRecord
     if (latestTimeForEachDevice
-        .get(timePartitionId)
-        .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE)
+            .get(timePartitionId)
+            .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE)
         < insertRowPlan.getTime()) {
       latestTimeForEachDevice
           .get(timePartitionId)
@@ -1174,9 +1163,9 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId            time partition range
+   * @param timeRangeId time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence               whether is sequence or not
+   * @param sequence whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(
       long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, 
boolean sequence)
@@ -1313,9 +1302,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * thread-safety should be ensured by caller
-   */
+  /** thread-safety should be ensured by caller */
   public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor 
tsFileProcessor) {
     // for sequence tsfile, we update the endTimeMap only when the file is 
prepared to be closed.
     // for unsequence tsfile, we have maintained the endTimeMap when an 
insertion comes.
@@ -1353,9 +1340,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * delete the storageGroup's own folder in folder data/system/storage_groups
-   */
+  /** delete the storageGroup's own folder in folder 
data/system/storage_groups */
   public void deleteFolder(String systemDir) {
     logger.info(
         "{} will close all files for deleting data folder {}",
@@ -1446,9 +1431,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * Iterate each TsFile and try to lock and remove those out of TTL.
-   */
+  /** Iterate each TsFile and try to lock and remove those out of TTL. */
   public synchronized void checkFilesTTL() {
     if (dataTTL == Long.MAX_VALUE) {
       logger.debug(
@@ -1515,9 +1498,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * This method will be blocked until all tsfile processors are closed.
-   */
+  /** This method will be blocked until all tsfile processors are closed. */
   public void syncCloseAllWorkingTsFileProcessors() {
     synchronized (closeStorageGroupCondition) {
       try {
@@ -1717,9 +1698,9 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param path      the timeseries path of the to be deleted.
+   * @param path the timeseries path of the to be deleted.
    * @param startTime the startTime of delete range.
-   * @param endTime   the endTime of delete range.
+   * @param endTime the endTime of delete range.
    */
   public void delete(PartialPath path, long startTime, long endTime, long 
planIndex)
       throws IOException {
@@ -1930,7 +1911,7 @@ public class StorageGroupProcessor {
   /**
    * update latest flush time for partition id
    *
-   * @param partitionId     partition id
+   * @param partitionId partition id
    * @param latestFlushTime lastest flush time
    * @return true if update latest flush time success
    */
@@ -1965,9 +1946,7 @@ public class StorageGroupProcessor {
     return true;
   }
 
-  /**
-   * used for upgrading
-   */
+  /** used for upgrading */
   public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
       long partitionId, String deviceId, long time) {
     newlyFlushedPartitionLatestFlushedTimeForEachDevice
@@ -1975,9 +1954,7 @@ public class StorageGroupProcessor {
         .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
-  /**
-   * put the memtable back to the MemTablePool and make the metadata in writer 
visible
-   */
+  /** put the memtable back to the MemTablePool and make the metadata in 
writer visible */
   // TODO please consider concurrency with query and insert method.
   private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor 
tsFileProcessor)
       throws TsFileProcessorException {
@@ -2019,7 +1996,7 @@ public class StorageGroupProcessor {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
                 tsFileManagement
-                    .new 
CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
+                .new CompactionMergeTask(this::closeCompactionMergeCallBack, 
timePartition));
       } catch (IOException | RejectedExecutionException e) {
         this.closeCompactionMergeCallBack();
         logger.error(
@@ -2034,9 +2011,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * close compaction merge callback, to release some locks
-   */
+  /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack() {
     this.compactionMergeWorking = false;
   }
@@ -2318,12 +2293,11 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can 
be inserted into
-   * them.
+   * Find the position of "newTsFileResource" in the sequence files if it can 
be inserted into them.
    *
    * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one 
to be inserted
-   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position 
i >= -1 if the new
-   * file can be inserted between [i, i+1]
+   *     POS_OVERLAP(-3) if some file overlaps the new file an insertion 
position i >= -1 if the new
+   *     file can be inserted between [i, i+1]
    */
   private int findInsertionPosition(
       TsFileResource newTsFileResource,
@@ -2374,7 +2348,7 @@ public class StorageGroupProcessor {
    * Compare each device in the two files to find the time relation of them.
    *
    * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is 
partially older than
-   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer 
than fileB (B < A)
+   *     fileB and partially newer than fileB (A X B) 1 if fileA is totally 
newer than fileB (B < A)
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) 
{
     boolean hasPre = false, hasSubsequence = false;
@@ -2495,8 +2469,7 @@ public class StorageGroupProcessor {
    * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, 
we can restore the
    * list based on the file name and ensure the correctness of the order, so 
there are three cases.
    *
-   * <p>1. The tsfile is to be inserted in the first place of the list. If the 
timestamp in the
-   * file
+   * <p>1. The tsfile is to be inserted in the first place of the list. If the 
timestamp in the file
    * name is less than the timestamp in the file name of the first tsfile in 
the list, then the file
    * name is legal and the file name is returned directly. Otherwise, its 
timestamp can be set to
    * half of the timestamp value in the file name of the first tsfile in the 
list , and the version
@@ -2512,9 +2485,9 @@ public class StorageGroupProcessor {
    * name and returns directly; otherwise, the time stamp is the mean of the 
timestamps of the two
    * files, the version number is the version number in the tsfile with a 
larger timestamp.
    *
-   * @param tsfileName  origin tsfile name
+   * @param tsfileName origin tsfile name
    * @param insertIndex the new file will be inserted between the files 
[insertIndex, insertIndex +
-   *                    1]
+   *     1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(
@@ -2547,16 +2520,16 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Update latest time in latestTimeForEachDevice and 
partitionLatestFlushedTimeForEachDevice.
-   * @UsedBy sync module, load external tsfile module.
+   * Update latest time in latestTimeForEachDevice and
+   * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load 
external tsfile module.
    */
   private void updateLatestTimeMap(TsFileResource newTsFileResource) {
     for (String device : newTsFileResource.getDevices()) {
       long endTime = newTsFileResource.getEndTime(device);
       long timePartitionId = StorageEngine.getTimePartition(endTime);
       if (!latestTimeForEachDevice
-          .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-          .containsKey(device)
+              .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+              .containsKey(device)
           || latestTimeForEachDevice.get(timePartitionId).get(device) < 
endTime) {
         latestTimeForEachDevice.get(timePartitionId).put(device, endTime);
       }
@@ -2578,8 +2551,8 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type            load type
-   * @param tsFileResource  tsfile resource to be loaded
+   * @param type load type
+   * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @return load the file successfully @UsedBy sync module, load external 
tsfile module.
    */
@@ -2687,7 +2660,7 @@ public class StorageGroupProcessor {
    *
    * @param tsfieToBeDeleted tsfile to be deleted
    * @return whether the file to be deleted exists. @UsedBy sync module, load 
external tsfile
-   * module.
+   *     module.
    */
   public boolean deleteTsfile(File tsfieToBeDeleted) {
     tsFileManagement.writeLock();
@@ -2821,14 +2794,14 @@ public class StorageGroupProcessor {
    * "tsFileResource" have the same plan indexes as the local one.
    *
    * @return true if any file contains plans with indexes no less than the max 
plan index of
-   * "tsFileResource", otherwise false.
+   *     "tsFileResource", otherwise false.
    */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long 
partitionNum) {
     // examine working processor first as they have the largest plan index
     return isFileAlreadyExistInWorking(
-        tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+            tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
         || isFileAlreadyExistInWorking(
-        tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+            tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
         || isFileAlreadyExistInClosed(tsFileResource, partitionNum, 
getSequenceFileTreeSet())
         || isFileAlreadyExistInClosed(tsFileResource, partitionNum, 
getUnSequenceFileList());
   }
@@ -2876,9 +2849,7 @@ public class StorageGroupProcessor {
     return false;
   }
 
-  /**
-   * remove all partitions that satisfy a filter.
-   */
+  /** remove all partitions that satisfy a filter. */
   public void removePartitions(TimePartitionFilter filter) {
     // this requires blocking all other activities
     tsFileManagement.writeLock();
@@ -2961,8 +2932,8 @@ public class StorageGroupProcessor {
           isSequence =
               plan.getTime()
                   > partitionLatestFlushedTimeForEachDevice
-                  .get(timePartitionId)
-                  .getOrDefault(plan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
+                      .get(timePartitionId)
+                      .getOrDefault(plan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
         }
         // is unsequence and user set config to discard out of order data
         if (!isSequence

Reply via email to