This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6228880ecf9 Pipe: Optimized the logger semantic && the retry logic of 
memory error at sink subtask (#17166)
6228880ecf9 is described below

commit 6228880ecf9c79afa2e5712e2fa35a094263ca94
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:21:26 2026 +0800

    Pipe: Optimized the logger semantic && the retry logic of memory error at 
sink subtask (#17166)
    
    * shop
    
    * fix
    
    * sit
    
    * logger
---
 .../apache/iotdb/confignode/manager/ProcedureManager.java    |  4 ++--
 .../iotdb/confignode/persistence/pipe/PipePluginInfo.java    |  2 +-
 .../iotdb/confignode/persistence/pipe/PipeTaskInfo.java      |  4 ++--
 .../impl/pipe/plugin/CreatePipePluginProcedure.java          |  2 +-
 .../procedure/impl/pipe/plugin/DropPipePluginProcedure.java  |  2 +-
 .../execution/config/executor/ClusterConfigTaskExecutor.java | 12 +++++++-----
 .../java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java   |  1 +
 .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java     | 11 ++++++++++-
 8 files changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index e7cc28f01bb..0fe3abc79a7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1501,9 +1501,9 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus alterPipe(TAlterPipeReq req) {
+  public TSStatus alterPipe(final TAlterPipeReq req) {
     try {
-      AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
+      final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
       executor.submitProcedure(procedure);
       TSStatus status = waitingProcedureFinished(procedure);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 51007f10236..e53ef00b308 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -174,7 +174,7 @@ public class PipePluginInfo implements SnapshotProcessor {
           String.format(
               "Failed to create or alter pipe, the pipe extractor plugin %s 
does not exist",
               sourcePluginName);
-      LOGGER.warn(exceptionMessage);
+      LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 0b3acb4058e..929024a0689 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -185,7 +185,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
         String.format(
             "Failed to create pipe %s, %s",
             createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
-    LOGGER.warn(exceptionMessage);
+    LOGGER.info(exceptionMessage);
     throw new PipeException(exceptionMessage);
   }
 
@@ -205,7 +205,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
       final String exceptionMessage =
           String.format(
               "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), 
PIPE_NOT_EXIST_MSG);
-      LOGGER.warn(exceptionMessage);
+      LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index e3a4719a719..f4fa738428d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -145,7 +145,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
       }
     } catch (PipeException e) {
       // The pipe plugin has already created, we should end the procedure
-      LOGGER.warn(
+      LOGGER.info(
           "Pipe plugin {} is already created, end the 
CreatePipePluginProcedure({})",
           pluginName,
           pluginName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 665a3782a91..efbe1ee6ccd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -143,7 +143,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
       subscriptionInfo.validatePipePluginUsageByTopic(pluginName);
     } catch (PipeException e) {
       // if the pipe plugin is a built-in plugin, we should not drop it
-      LOGGER.warn(e.getMessage());
+      LOGGER.info(e.getMessage());
       pipePluginCoordinator.unlock();
       pipeTaskCoordinator.unlock();
       setFailure(new ProcedureException(e.getMessage()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 3627e3f82af..52c85334c18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1071,7 +1071,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         final Visibility pluginVisibility = 
VisibilityUtils.calculateFromPluginClass(clazz);
         final boolean isTableModel = createPipePluginStatement.isTableModel();
         if (!VisibilityUtils.isCompatible(pluginVisibility, isTableModel)) {
-          LOGGER.warn(
+          LOGGER.info(
               "Failed to create PipePlugin({}) because this plugin is not 
designed for {} model.",
               createPipePluginStatement.getPluginName(),
               isTableModel ? "table" : "tree");
@@ -1151,10 +1151,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                   
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition())
                   .setIsTableModel(dropPipePluginStatement.isTableModel()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
executionStatus.getCode()) {
-        LOGGER.warn(
-            "[{}] Failed to drop pipe plugin {}.",
-            executionStatus,
-            dropPipePluginStatement.getPluginName());
+        if (TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() != 
executionStatus.getCode()) {
+          LOGGER.warn(
+              "[{}] Failed to drop pipe plugin {}.",
+              executionStatus,
+              dropPipePluginStatement.getPluginName());
+        }
         future.setException(new IoTDBException(executionStatus));
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index b70d51cca95..34d2e9bf82c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -117,6 +117,7 @@ public class ErrorHandlingUtils {
             || status.getCode() == 
TSStatusCode.EXECUTE_UDF_ERROR.getStatusCode()
             || status.getCode() == 
TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()
             || status.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+            || status.getCode() == 
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode()
             || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) 
{
           LOGGER.info(message);
         } else {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index af75e6424f7..5f7156f6324 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.subtask;
 
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -33,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,7 +271,13 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   protected void handleException(final Event event, final Exception e) {
-    if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+    if (e instanceof PipeRuntimeOutOfMemoryCriticalException
+        || ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
+      PipeLogger.log(
+          LOGGER::info,
+          e,
+          "Temporarily out of memory in pipe event transferring, will wait for 
the memory to release.");
+    } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) 
{
       if (lastExceptionTime == Long.MAX_VALUE) {
         lastExceptionTime = System.currentTimeMillis();
       }

Reply via email to