This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch compation-log
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ea15c88f290ec80d063c44fcf303ac3971f59c14
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 20 11:34:33 2025 +0800

    modify TsFileProcessor
---
 .../dataregion/flush/MemTableFlushTask.java        |   3 +-
 .../dataregion/memtable/TsFileProcessor.java       | 141 +++++++++++----------
 2 files changed, 74 insertions(+), 70 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index d1a3f3c5bab..8d586dfc619 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.flush;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -60,7 +61,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class MemTableFlushTask {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MemTableFlushTask.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.WRITE_LOGGER_NAME);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
       FlushSubTaskPoolManager.getInstance();
   private static final WritingMetrics WRITING_METRICS = 
WritingMetrics.getInstance();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bd4c11e969b..9cc35f11d3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
@@ -120,7 +121,9 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORK
 public class TsFileProcessor {
 
   /** Logger fot this class. */
-  private static final Logger logger = 
LoggerFactory.getLogger(TsFileProcessor.class);
+  private static final Logger writeLogger = 
LoggerFactory.getLogger(IoTDBConstant.WRITE_LOGGER_NAME);
+  private static final Logger queryLogger = 
LoggerFactory.getLogger(IoTDBConstant.QUERY_LOGGER_NAME);
+  private static final Logger otherLogger = 
LoggerFactory.getLogger(TsFileProcessor.class);
 
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
@@ -226,7 +229,7 @@ public class TsFileProcessor {
     flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
     flushListeners.add(this.walNode);
     closeFileListeners.add(closeUnsealedTsFileProcessor);
-    logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
+    writeLogger.info("create a new tsfile processor {}", 
tsfile.getAbsolutePath());
   }
 
   @SuppressWarnings("java:S107") // ignore number of arguments
@@ -250,7 +253,7 @@ public class TsFileProcessor {
     flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
     flushListeners.add(this.walNode);
     closeFileListeners.add(closeUnsealedTsFileProcessor);
-    logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile());
+    writeLogger.info("reopen a tsfile processor {}", 
tsFileResource.getTsFile());
   }
 
   /**
@@ -296,7 +299,7 @@ public class TsFileProcessor {
       }
     } catch (Exception e) {
       rollbackMemoryInfo(memIncrements);
-      logger.warn("Exception during wal flush", e);
+      writeLogger.warn("Exception during wal flush", e);
       throw new WriteProcessException(
           String.format(
               "%s: %s write WAL failed: %s",
@@ -392,7 +395,7 @@ public class TsFileProcessor {
       }
     } catch (Exception e) {
       rollbackMemoryInfo(memIncrements);
-      logger.warn("Exception during wal flush", e);
+      writeLogger.warn("Exception during wal flush", e);
       throw new WriteProcessException(
           String.format(
               "%s: %s write WAL failed: %s",
@@ -1028,7 +1031,7 @@ public class TsFileProcessor {
     logFlushQueryWriteLocked();
     try {
       if (workMemTable != null) {
-        logger.info(
+        writeLogger.info(
             "[Deletion] Deletion with path: {}, time:{}-{} in workMemTable",
             deletion.getPath(),
             deletion.getStartTime(),
@@ -1069,7 +1072,7 @@ public class TsFileProcessor {
 
   @TestOnly
   public void syncClose() throws ExecutionException {
-    logger.info(
+    writeLogger.info(
         "Sync close file: {}, will firstly async close it",
         tsFileResource.getTsFile().getAbsolutePath());
     if (shouldClose) {
@@ -1077,7 +1080,7 @@ public class TsFileProcessor {
     }
     try {
       asyncClose().get();
-      logger.info("Start to wait until file {} is closed", tsFileResource);
+      writeLogger.info("Start to wait until file {} is closed", 
tsFileResource);
       // if this TsFileProcessor is closing, asyncClose().get() of this thread 
will return quickly,
       // but the TsFileProcessor may be not closed. Therefore, we need to 
check whether the writer
       // is null.
@@ -1087,7 +1090,7 @@ public class TsFileProcessor {
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
-    logger.info("File {} is closed synchronously", 
tsFileResource.getTsFile().getAbsolutePath());
+    writeLogger.info("File {} is closed synchronously", 
tsFileResource.getTsFile().getAbsolutePath());
   }
 
   /** async close one tsfile, register and close it by another thread */
@@ -1095,9 +1098,9 @@ public class TsFileProcessor {
     flushQueryLock.writeLock().lock();
     logFlushQueryWriteLocked();
     try {
-      if (logger.isDebugEnabled()) {
+      if (writeLogger.isDebugEnabled()) {
         if (workMemTable != null) {
-          logger.debug(
+          writeLogger.debug(
               "{}: flush a working memtable in async close tsfile {}, memtable 
size: {}, tsfile "
                   + "size: {}, plan index: [{}, {}], progress index: {}",
               storageGroupName,
@@ -1108,7 +1111,7 @@ public class TsFileProcessor {
               workMemTable.getMaxPlanIndex(),
               tsFileResource.getMaxProgressIndex());
         } else {
-          logger.debug(
+          writeLogger.debug(
               "{}: flush a NotifyFlushMemTable in async close tsfile {}, 
tsfile size: {}",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
@@ -1145,7 +1148,7 @@ public class TsFileProcessor {
         shouldClose = true;
         return future;
       } catch (Exception e) {
-        logger.error(
+        writeLogger.error(
             "{}: {} async close failed, because",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -1169,8 +1172,8 @@ public class TsFileProcessor {
     logFlushQueryWriteLocked();
     try {
       tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : 
workMemTable;
-      if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
-        logger.debug(
+      if (writeLogger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
+        writeLogger.debug(
             "{}: {} add a signal memtable into flushing memtable list when 
sync flush",
             storageGroupName,
             tsFileResource.getTsFile().getName());
@@ -1188,14 +1191,14 @@ public class TsFileProcessor {
           flushingMemTables.wait(1000);
 
           if ((System.currentTimeMillis() - startWait) > 60_000) {
-            logger.warn(
+            writeLogger.warn(
                 "has waited for synced flushing a memtable in {} for 60 
seconds.",
                 this.tsFileResource.getTsFile().getAbsolutePath());
             startWait = System.currentTimeMillis();
           }
         }
       } catch (InterruptedException e) {
-        logger.error(
+        writeLogger.error(
             "{}: {} wait flush finished meets error",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -1213,11 +1216,11 @@ public class TsFileProcessor {
       if (workMemTable == null) {
         return;
       }
-      logger.info(
+      writeLogger.info(
           "Async flush a memtable to tsfile: {}", 
tsFileResource.getTsFile().getAbsolutePath());
       addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
-      logger.error(
+      writeLogger.error(
           "{}: {} add a memtable into flushing list failed",
           storageGroupName,
           tsFileResource.getTsFile().getName(),
@@ -1257,8 +1260,8 @@ public class TsFileProcessor {
 
     
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
     flushingMemTables.addLast(tobeFlushed);
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug(
           "{}: {} Memtable (signal = {}) is added into the flushing Memtable, 
queue size = {}",
           storageGroupName,
           tsFileResource.getTsFile().getName(),
@@ -1284,13 +1287,13 @@ public class TsFileProcessor {
     try {
       writer.makeMetadataVisible();
       if (!flushingMemTables.remove(memTable)) {
-        logger.warn(
+        writeLogger.warn(
             "{}: {} put the memtable (signal={}) out of flushingMemtables but 
it is not in the queue.",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
             memTable.isSignalMemTable());
-      } else if (logger.isDebugEnabled()) {
-        logger.debug(
+      } else if (writeLogger.isDebugEnabled()) {
+        writeLogger.debug(
             "{}: {} memtable (signal={}) is removed from the queue. {} left.",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -1301,8 +1304,8 @@ public class TsFileProcessor {
       MemTableManager.getInstance().decreaseMemtableNumber();
       // Reset the mem cost in StorageGroupProcessorInfo
       dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
-      if (logger.isDebugEnabled()) {
-        logger.debug(
+      if (writeLogger.isDebugEnabled()) {
+        writeLogger.debug(
             "[mem control] {}: {} flush finished, try to reset system mem 
cost, "
                 + "flushing memtable list size: {}",
             storageGroupName,
@@ -1312,8 +1315,8 @@ public class TsFileProcessor {
       // Report to System
       SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
       
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
-      if (logger.isDebugEnabled()) {
-        logger.debug(
+      if (writeLogger.isDebugEnabled()) {
+        writeLogger.debug(
             "{}: {} flush finished, remove a memtable from flushing list, "
                 + "flushing memtable list size: {}",
             storageGroupName,
@@ -1321,7 +1324,7 @@ public class TsFileProcessor {
             flushingMemTables.size());
       }
     } catch (Exception e) {
-      logger.error("{}: {}", storageGroupName, 
tsFileResource.getTsFile().getName(), e);
+      writeLogger.error("{}: {}", storageGroupName, 
tsFileResource.getTsFile().getName(), e);
     } finally {
       flushQueryLock.writeLock().unlock();
       logFlushQueryWriteUnlocked();
@@ -1333,8 +1336,8 @@ public class TsFileProcessor {
     synchronized (flushingMemTables) {
       releaseFlushedMemTable(memTable);
       flushingMemTables.notifyAll();
-      if (logger.isDebugEnabled()) {
-        logger.debug(
+      if (writeLogger.isDebugEnabled()) {
+        writeLogger.debug(
             "{}: {} released a memtable (signal={}), flushingMemtables size 
={}",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -1355,7 +1358,7 @@ public class TsFileProcessor {
     // Signal memtable only may appear when calling asyncClose()
     if (!memTableToFlush.isSignalMemTable()) {
       if (memTableToFlush.isEmpty()) {
-        logger.info(
+        writeLogger.info(
             "This normal memtable is empty, skip flush. {}: {}",
             storageGroupName,
             tsFileResource.getTsFile().getName());
@@ -1372,7 +1375,7 @@ public class TsFileProcessor {
           memTableFlushPointCount = memTableToFlush.getTotalPointsNum();
         } catch (Throwable e) {
           if (writer == null) {
-            logger.info(
+            writeLogger.info(
                 "{}: {} is closed during flush, abandon flush task",
                 storageGroupName,
                 tsFileResource.getTsFile().getAbsolutePath());
@@ -1380,21 +1383,21 @@ public class TsFileProcessor {
               flushingMemTables.notifyAll();
             }
           } else {
-            logger.error(
+            writeLogger.error(
                 "{}: {} meet error when flushing a memtable, change system 
mode to error",
                 storageGroupName,
                 tsFileResource.getTsFile().getAbsolutePath(),
                 e);
             
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
             try {
-              logger.error(
+              writeLogger.error(
                   "{}: {} IOTask meets error, truncate the corrupted data",
                   storageGroupName,
                   tsFileResource.getTsFile().getAbsolutePath(),
                   e);
               writer.reset();
             } catch (IOException e1) {
-              logger.error(
+              writeLogger.error(
                   "{}: {} Truncate corrupted data meets error",
                   storageGroupName,
                   tsFileResource.getTsFile().getAbsolutePath(),
@@ -1416,7 +1419,7 @@ public class TsFileProcessor {
                 flushingMemTables.notifyAll();
               }
             } catch (Exception e1) {
-              logger.error(
+              writeLogger.error(
                   "{}: {} Release resource meets error",
                   storageGroupName,
                   tsFileResource.getTsFile().getAbsolutePath(),
@@ -1438,7 +1441,7 @@ public class TsFileProcessor {
           this.tsFileResource.getModFile().write(entry.left);
           tsFileResource.getModFile().close();
           iterator.remove();
-          logger.info(
+          otherLogger.info(
               "[Deletion] Deletion with path: {}, time:{}-{} written when 
flush memtable",
               entry.left.getPath(),
               ((Deletion) (entry.left)).getStartTime(),
@@ -1446,7 +1449,7 @@ public class TsFileProcessor {
         }
       }
     } catch (IOException e) {
-      logger.error(
+      otherLogger.error(
           "Meet error when writing into ModificationFile file of {} ",
           tsFileResource.getTsFile().getAbsolutePath(),
           e);
@@ -1454,8 +1457,8 @@ public class TsFileProcessor {
       flushQueryLock.writeLock().unlock();
     }
 
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug(
           "{}: {} try get lock to release a memtable (signal={})",
           storageGroupName,
           tsFileResource.getTsFile().getAbsolutePath(),
@@ -1467,7 +1470,7 @@ public class TsFileProcessor {
     try {
       writer.getTsFileOutput().force();
     } catch (IOException e) {
-      logger.error("fsync memTable data to disk error,", e);
+      writeLogger.error("fsync memTable data to disk error,", e);
     }
 
     // Call flushed listener after memtable is released safely
@@ -1483,19 +1486,19 @@ public class TsFileProcessor {
         } else {
           writer.mark();
           updateCompressionRatio();
-          if (logger.isDebugEnabled()) {
-            logger.debug(
+          if (writeLogger.isDebugEnabled()) {
+            writeLogger.debug(
                 "{}: {} flushingMemtables is empty and will close the file",
                 storageGroupName,
                 tsFileResource.getTsFile().getAbsolutePath());
           }
           endFile();
         }
-        if (logger.isDebugEnabled()) {
-          logger.debug("{} flushingMemtables is clear", storageGroupName);
+        if (writeLogger.isDebugEnabled()) {
+          writeLogger.debug("{} flushingMemtables is clear", storageGroupName);
         }
       } catch (Exception e) {
-        logger.error(
+        writeLogger.error(
             "{}: {} marking or ending file meet error",
             storageGroupName,
             tsFileResource.getTsFile().getAbsolutePath(),
@@ -1504,7 +1507,7 @@ public class TsFileProcessor {
         try {
           writer.reset();
         } catch (IOException e1) {
-          logger.error(
+          writeLogger.error(
               "{}: {} truncate corrupted data meets error",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
@@ -1512,7 +1515,7 @@ public class TsFileProcessor {
         }
         // Retry or set read-only
         if (retryCnt < 3) {
-          logger.warn(
+          writeLogger.warn(
               "{} meet error when flush FileMetadata to {}, retry it again",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
@@ -1520,7 +1523,7 @@ public class TsFileProcessor {
           retryCnt++;
           continue;
         } else {
-          logger.error(
+          writeLogger.error(
               "{} meet error when flush FileMetadata to {}, change system mode 
to error",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
@@ -1530,8 +1533,8 @@ public class TsFileProcessor {
         }
       }
       // For sync close
-      if (logger.isDebugEnabled()) {
-        logger.debug(
+      if (writeLogger.isDebugEnabled()) {
+        writeLogger.debug(
             "{}: {} try to get flushingMemtables lock.",
             storageGroupName,
             tsFileResource.getTsFile().getAbsolutePath());
@@ -1545,7 +1548,7 @@ public class TsFileProcessor {
   private void updateCompressionRatio() {
     try {
       double compressionRatio = ((double) totalMemTableSize) / writer.getPos();
-      logger.info(
+      writeLogger.info(
           "The compression ratio of tsfile {} is {}, totalMemTableSize: {}, 
the file size: {}",
           writer.getFile().getAbsolutePath(),
           String.format("%.2f", compressionRatio),
@@ -1556,7 +1559,7 @@ public class TsFileProcessor {
           .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, 
compressionRatio);
       CompressionRatio.getInstance().updateRatio(totalMemTableSize, 
writer.getPos());
     } catch (IOException e) {
-      logger.error(
+      writeLogger.error(
           "{}: {} update compression ratio failed",
           storageGroupName,
           tsFileResource.getTsFile().getName(),
@@ -1566,14 +1569,14 @@ public class TsFileProcessor {
 
   /** end file and write some meta */
   private void endFile() throws IOException, TsFileProcessorException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Start to end file {}", tsFileResource);
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug("Start to end file {}", tsFileResource);
     }
     writer.endFile();
     tsFileResource.serialize();
     FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
-    if (logger.isDebugEnabled()) {
-      logger.debug("Ended file {}", tsFileResource);
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug("Ended file {}", tsFileResource);
     }
     // Remove this processor from Closing list in StorageGroupProcessor,
     // Mark the TsFileResource closed, no need writer anymore
@@ -1589,7 +1592,7 @@ public class TsFileProcessor {
 
   /** End empty file and remove it from file system */
   private void endEmptyFile() throws TsFileProcessorException, IOException {
-    logger.info("Start to end empty file {}", tsFileResource);
+    writeLogger.info("Start to end empty file {}", tsFileResource);
 
     // Remove this processor from Closing list in DataRegion,
     // Mark the TsFileResource closed, no need writer anymore
@@ -1599,7 +1602,7 @@ public class TsFileProcessor {
     }
     tsFileProcessorInfo.clear();
     dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
-    logger.info(
+    writeLogger.info(
         "Storage group {} close and remove empty file {}",
         storageGroupName,
         tsFileResource.getTsFile().getAbsoluteFile());
@@ -1856,7 +1859,7 @@ public class TsFileProcessor {
           }
         }
       } catch (QueryProcessException | MetadataException | IOException e) {
-        logger.error(
+        queryLogger.error(
             "{}: {} get ReadOnlyMemChunk has error",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -1940,7 +1943,7 @@ public class TsFileProcessor {
           }
         }
       } catch (QueryProcessException | MetadataException | IOException e) {
-        logger.error(
+        queryLogger.error(
             "{}: {} get ReadOnlyMemChunk has error",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -2019,7 +2022,7 @@ public class TsFileProcessor {
           }
         }
       } catch (QueryProcessException | MetadataException e) {
-        logger.error(
+        queryLogger.error(
             "{}: {} get ReadOnlyMemChunk has error",
             storageGroupName,
             tsFileResource.getTsFile().getName(),
@@ -2141,22 +2144,22 @@ public class TsFileProcessor {
   }
 
   private void logFlushQueryWriteLocked() {
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug(
           FLUSH_QUERY_WRITE_LOCKED, storageGroupName, 
tsFileResource.getTsFile().getName());
     }
   }
 
   private void logFlushQueryWriteUnlocked() {
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug(
           FLUSH_QUERY_WRITE_RELEASE, storageGroupName, 
tsFileResource.getTsFile().getName());
     }
   }
 
   private void logFlushQueryReadUnlocked() {
-    if (logger.isDebugEnabled()) {
-      logger.debug(
+    if (writeLogger.isDebugEnabled()) {
+      writeLogger.debug(
           "{}: {} release flushQueryLock", storageGroupName, 
tsFileResource.getTsFile().getName());
     }
   }

Reply via email to