This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch add_log_event
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4f80e0bd2f58c1e98260c09e2b61ba9fc7cd7b42
Author: qiaojialin <[email protected]>
AuthorDate: Wed Sep 14 11:12:09 2022 +0800

    add event name in log
---
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 15 ++++++-----
 .../mpp/execution/exchange/LocalSourceHandle.java  | 12 +++++----
 .../execution/exchange/MPPDataExchangeManager.java | 29 +++++++++++-----------
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  2 +-
 .../db/mpp/execution/exchange/SinkHandle.java      | 21 ++++++----------
 .../db/mpp/execution/exchange/SourceHandle.java    | 15 +++++------
 .../fragment/FragmentInstanceStateMachine.java     |  2 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  4 +--
 8 files changed, 49 insertions(+), 51 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index aaba4b8d0b..1f70ff4460 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -113,7 +113,7 @@ public class LocalSinkHandle implements ISinkHandle {
       if (queue.hasNoMoreTsBlocks()) {
         return;
       }
-      logger.info("send TsBlocks");
+      logger.info("[StartSendTsBlockOnLocal]");
       synchronized (this) {
         blocked = queue.add(tsBlock);
       }
@@ -129,9 +129,8 @@ public class LocalSinkHandle implements ISinkHandle {
   public void setNoMoreTsBlocks() {
     synchronized (queue) {
       synchronized (this) {
-        logger.info("set noMoreTsBlocks.");
+        logger.info("[StartSetNoMoreTsBlocksOnLocal]");
         if (aborted || closed) {
-          logger.info("SinkHandle has been aborted={} or closed={}.", aborted, 
closed);
           return;
         }
         queue.setNoMoreTsBlocks(true);
@@ -139,12 +138,12 @@ public class LocalSinkHandle implements ISinkHandle {
       }
     }
     checkAndInvokeOnFinished();
-    logger.info("noMoreTsBlocks has been set.");
+    logger.info("[EndSetNoMoreTsBlocksOnLocal]");
   }
 
   @Override
   public void abort() {
-    logger.info("Sink handle is being aborted.");
+    logger.info("[StartAbortLocalSinkHandle]");
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
@@ -155,12 +154,12 @@ public class LocalSinkHandle implements ISinkHandle {
         sinkHandleListener.onAborted(this);
       }
     }
-    logger.info("Sink handle is aborted");
+    logger.info("[EndAbortLocalSinkHandle]");
   }
 
   @Override
   public void close() {
-    logger.info("Sink handle is being closed.");
+    logger.info("[StartCloseLocalSinkHandle]");
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
@@ -171,7 +170,7 @@ public class LocalSinkHandle implements ISinkHandle {
         sinkHandleListener.onFinish(this);
       }
     }
-    logger.info("Sink handle is closed");
+    logger.info("[EndCloseLocalSinkHandle]");
   }
 
   public TFragmentInstanceId getRemoteFragmentInstanceId() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 35c7cda823..c8182acf8d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -95,7 +95,9 @@ public class LocalSourceHandle implements ISourceHandle {
       if (tsBlock != null) {
         currSequenceId++;
         logger.info(
-            "Receive {} TsdBlock, size is {}", currSequenceId, 
tsBlock.getRetainedSizeInBytes());
+            "[GetTsBlockFromQueue] TsBlock:{} size:{}",
+            currSequenceId,
+            tsBlock.getRetainedSizeInBytes());
       }
       checkAndInvokeOnFinished();
       return tsBlock;
@@ -139,7 +141,7 @@ public class LocalSourceHandle implements ISourceHandle {
       return;
     }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      logger.info("Source handle is being aborted.");
+      logger.info("[StartAbortLocalSourceHandle]");
       synchronized (queue) {
         synchronized (this) {
           if (aborted || closed) {
@@ -150,7 +152,7 @@ public class LocalSourceHandle implements ISourceHandle {
           sourceHandleListener.onAborted(this);
         }
       }
-      logger.info("Source handle is aborted");
+      logger.info("[EndAbortLocalSourceHandle]");
     }
   }
 
@@ -160,7 +162,7 @@ public class LocalSourceHandle implements ISourceHandle {
       return;
     }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      logger.info("Source handle is being closed.");
+      logger.info("[StartCloseLocalSourceHandle]");
       synchronized (queue) {
         synchronized (this) {
           if (aborted || closed) {
@@ -171,7 +173,7 @@ public class LocalSourceHandle implements ISourceHandle {
           sourceHandleListener.onFinished(this);
         }
       }
-      logger.info("Source handle is closed");
+      logger.info("[EndCloseLocalSourceHandle]");
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index e14672ec88..c9634a75a9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -80,10 +80,9 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       try (SetThreadName fragmentInstanceName =
           new SetThreadName(createFullIdFrom(req.sourceFragmentInstanceId, 
"SinkHandle"))) {
         logger.debug(
-            "Get data block request received, for data blocks whose sequence 
ID in [{}, {}) from {}.",
+            "[ProcessGetTsBlockRequest] sequence ID in [{}, {})",
             req.getStartSequenceId(),
-            req.getEndSequenceId(),
-            req.getSourceFragmentInstanceId());
+            req.getEndSequenceId());
         if (!sinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
           throw new TException(
               "Source fragment instance not found. Fragment instance ID: "
@@ -215,12 +214,12 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
 
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
-      logger.info("finished and release resources");
+      logger.info("[ScHListenerOnFinish]");
       if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
           || !sourceHandles
               .get(sourceHandle.getLocalFragmentInstanceId())
               .containsKey(sourceHandle.getLocalPlanNodeId())) {
-        logger.info("resources has already been released");
+        logger.warn("[ScHListenerAlreadyReleased]");
       } else {
         sourceHandles
             .get(sourceHandle.getLocalFragmentInstanceId())
@@ -234,7 +233,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
 
     @Override
     public void onAborted(ISourceHandle sourceHandle) {
-      logger.info("onAborted is invoked");
+      logger.info("[ScHListenerOnAbort]");
       onFinished(sourceHandle);
     }
 
@@ -262,28 +261,29 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
 
     @Override
     public void onFinish(ISinkHandle sinkHandle) {
-      logger.info("onFinish is invoked");
+      logger.info("[SkHListenerOnFinish]");
       removeFromMPPDataExchangeManager(sinkHandle);
       context.finished();
     }
 
     @Override
     public void onEndOfBlocks(ISinkHandle sinkHandle) {
+      logger.info("[SkHListenerOnEndOfTsBlocks]");
       context.transitionToFlushing();
     }
 
     @Override
     public void onAborted(ISinkHandle sinkHandle) {
-      logger.info("onAborted is invoked");
+      logger.info("[SkHListenerOnAbort]");
       removeFromMPPDataExchangeManager(sinkHandle);
     }
 
     private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
-      logger.info("release resources of finished sink handle");
-      if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
-        logger.info("resources already been released");
+      if (sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()) == null) 
{
+        logger.warn("[RemoveNoSinkHandle]");
+      } else {
+        logger.info("[RemoveSinkHandle]");
       }
-      sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
     }
 
     @Override
@@ -493,7 +493,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
    * <p>This method should be called when a fragment instance finished in an 
abnormal state.
    */
   public void forceDeregisterFragmentInstance(TFragmentInstanceId 
fragmentInstanceId) {
-    logger.info("Force deregister fragment instance");
+    logger.info("[StartForceReleaseFIDataExchangeResource]");
     if (sinkHandles.containsKey(fragmentInstanceId)) {
       ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId);
       sinkHandle.abort();
@@ -502,11 +502,12 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     if (sourceHandles.containsKey(fragmentInstanceId)) {
       Map<String, ISourceHandle> planNodeIdToSourceHandle = 
sourceHandles.get(fragmentInstanceId);
       for (Entry<String, ISourceHandle> entry : 
planNodeIdToSourceHandle.entrySet()) {
-        logger.info("Close source handle {}", sourceHandles);
+        logger.info("[CloseSourceHandle] {}", entry.getKey());
         entry.getValue().abort();
       }
       sourceHandles.remove(fragmentInstanceId);
     }
+    logger.info("[EndForceReleaseFIDataExchangeResource]");
   }
 
   /** @param suffix should be like [PlanNodeId].SourceHandle/SinHandle */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 1222ee5ef0..ed044814a2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -96,7 +96,7 @@ public class SharedTsBlockQueue {
 
   /** Notify no more tsblocks will be added to the queue. */
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
-    logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
+    logger.info("[SignalNoMoreTsBlockOnQueue]");
     if (closed) {
       logger.warn("queue has been destroyed");
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 703a04d760..63fb42784c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -167,7 +167,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void sendEndOfDataBlockEvent() throws Exception {
-    logger.info("send end of data block event");
+    logger.info("[NotifyNoMoreTsBlock]");
     int attempt = 0;
     TEndOfDataBlockEvent endOfDataBlockEvent =
         new TEndOfDataBlockEvent(
@@ -193,7 +193,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void setNoMoreTsBlocks() {
-    logger.info("start to set no-more-tsblocks");
+    logger.info("[StartSetNoMoreTsBlocks]");
     if (aborted || closed) {
       return;
     }
@@ -202,20 +202,17 @@ public class SinkHandle implements ISinkHandle {
     } catch (Exception e) {
       throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
     }
-    logger.info("set noMoreTsBlocks to true");
     noMoreTsBlocks = true;
 
     if (isFinished()) {
-      logger.info("revoke onFinish() of sinkHandleListener");
       sinkHandleListener.onFinish(this);
     }
-    logger.info("revoke onEndOfBlocks() of sinkHandleListener");
     sinkHandleListener.onEndOfBlocks(this);
   }
 
   @Override
   public synchronized void abort() {
-    logger.info("SinkHandle is being aborted.");
+    logger.info("[StartAbortSinkHandle]");
     sequenceIdToTsBlock.clear();
     aborted = true;
     bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blocked);
@@ -226,12 +223,12 @@ public class SinkHandle implements ISinkHandle {
       bufferRetainedSizeInBytes = 0;
     }
     sinkHandleListener.onAborted(this);
-    logger.info("SinkHandle is aborted");
+    logger.info("[EndAbortSinkHandle]");
   }
 
   @Override
   public synchronized void close() {
-    logger.info("SinkHandle is being closed.");
+    logger.info("[StartCloseSinkHandle]");
     sequenceIdToTsBlock.clear();
     closed = true;
     bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryComplete(blocked);
@@ -242,7 +239,7 @@ public class SinkHandle implements ISinkHandle {
       bufferRetainedSizeInBytes = 0;
     }
     sinkHandleListener.onFinish(this);
-    logger.info("SinkHandle is closed");
+    logger.info("[EndCloseSinkHandle]");
   }
 
   @Override
@@ -307,7 +304,7 @@ public class SinkHandle implements ISinkHandle {
         freedBytes += entry.getValue().right;
         bufferRetainedSizeInBytes -= entry.getValue().right;
         iterator.remove();
-        logger.info("ack TsBlock {}.", entry.getKey());
+        logger.info("[ACKTsBlock] {}.", entry.getKey());
       }
     }
     if (isFinished()) {
@@ -381,9 +378,7 @@ public class SinkHandle implements ISinkHandle {
     public void run() {
       try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
         logger.info(
-            "Send new data block event [{}, {})",
-            startSequenceId,
-            startSequenceId + blockSizes.size());
+            "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + 
blockSizes.size());
         int attempt = 0;
         TNewDataBlockEvent newDataBlockEvent =
             new TNewDataBlockEvent(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index bd95291eb9..9b8d6c548f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -130,13 +130,13 @@ public class SourceHandle implements ISourceHandle {
         return null;
       }
       long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
-      logger.info("Receive {} TsBlock, size is {}", currSequenceId, 
retainedSize);
+      logger.info("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", 
currSequenceId, retainedSize);
       currSequenceId += 1;
       bufferRetainedSizeInBytes -= retainedSize;
       
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), 
retainedSize);
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
-        logger.info("no buffered TsBlock, blocked");
+        logger.info("[WaitForMoreTsBlock]");
         blocked = SettableFuture.create();
       }
       if (isFinished()) {
@@ -222,7 +222,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
-    logger.info("receive NoMoreTsBlock event. ");
+    logger.info("[ReceiveNoMoreTsBlockEvent]");
     this.lastSequenceId = lastSequenceId;
     if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
       blocked.set(null);
@@ -234,7 +234,7 @@ public class SourceHandle implements ISourceHandle {
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> 
dataBlockSizes) {
     logger.info(
-        "receive newDataBlockEvent. [{}, {}), each size is: {}",
+        "[ReceiveNewTsBlockNotification] [{}, {}), each size is: {}",
         startSequenceId,
         startSequenceId + dataBlockSizes.size(),
         dataBlockSizes);
@@ -380,7 +380,7 @@ public class SourceHandle implements ISourceHandle {
     @Override
     public void run() {
       try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-        logger.info("try to get data blocks [{}, {}) ", startSequenceId, 
endSequenceId);
+        logger.info("[StartPullTsBlocksFromRemote] [{}, {}) ", 
startSequenceId, endSequenceId);
         TGetDataBlockRequest req =
             new TGetDataBlockRequest(remoteFragmentInstanceId, 
startSequenceId, endSequenceId);
         int attempt = 0;
@@ -394,7 +394,7 @@ public class SourceHandle implements ISourceHandle {
               TsBlock tsBlock = serde.deserialize(byteBuffer);
               tsBlocks.add(tsBlock);
             }
-            logger.info("got data blocks. count: {}", tsBlocks.size());
+            logger.info("[EndPullTsBlocksFromRemote] Count:{}", 
tsBlocks.size());
             executorService.submit(
                 new SendAcknowledgeDataBlockEventTask(startSequenceId, 
endSequenceId));
             synchronized (SourceHandle.this) {
@@ -404,6 +404,7 @@ public class SourceHandle implements ISourceHandle {
               for (int i = startSequenceId; i < endSequenceId; i++) {
                 sequenceIdToTsBlock.put(i, tsBlocks.get(i - startSequenceId));
               }
+              logger.info("[PutTsBlocksIntoBuffer]");
               if (!blocked.isDone()) {
                 blocked.set(null);
               }
@@ -452,7 +453,7 @@ public class SourceHandle implements ISourceHandle {
     @Override
     public void run() {
       try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-        logger.info("send ack data block event [{}, {}).", startSequenceId, 
endSequenceId);
+        logger.info("[SendACKTsBlock] [{}, {}).", startSequenceId, 
endSequenceId);
         int attempt = 0;
         TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent =
             new TAcknowledgeDataBlockEvent(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
index 699777d8f0..eef52841e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
@@ -78,7 +78,7 @@ public class FragmentInstanceStateMachine {
     instanceState.addStateChangeListener(
         newState -> {
           try (SetThreadName threadName = new 
SetThreadName(fragmentInstanceId.getFullId())) {
-            LOGGER.info("State transfer to {}", newState);
+            LOGGER.info("[StateChanged] To {}", newState);
           }
         });
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index f7ba31d0d9..6f896a6e9e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -134,7 +134,7 @@ public class LocalExecutionPlanner {
         freeMemoryForOperators -= estimatedMemorySize;
         LOGGER.info(
             String.format(
-                "consume memory: %d, current remaining memory: %d",
+                "[ConsumeMemory] consume: %d, current remaining memory: %d",
                 estimatedMemorySize, freeMemoryForOperators));
       }
     }
@@ -148,7 +148,7 @@ public class LocalExecutionPlanner {
                 this.freeMemoryForOperators += estimatedMemorySize;
                 LOGGER.info(
                     String.format(
-                        "release memory: %d, current remaining memory: %d",
+                        "[ReleaseMemory] release: %d, current remaining 
memory: %d",
                         estimatedMemorySize, freeMemoryForOperators));
               }
             }

Reply via email to