This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6228880ecf9 Pipe: Optimized the logger semantic && the retry logic of
memory error at sink subtask (#17166)
6228880ecf9 is described below
commit 6228880ecf9c79afa2e5712e2fa35a094263ca94
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:21:26 2026 +0800
Pipe: Optimized the logger semantic && the retry logic of memory error at
sink subtask (#17166)
* shop
* fix
* sit
* logger
---
.../apache/iotdb/confignode/manager/ProcedureManager.java | 4 ++--
.../iotdb/confignode/persistence/pipe/PipePluginInfo.java | 2 +-
.../iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 4 ++--
.../impl/pipe/plugin/CreatePipePluginProcedure.java | 2 +-
.../procedure/impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../execution/config/executor/ClusterConfigTaskExecutor.java | 12 +++++++-----
.../java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java | 1 +
.../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 11 ++++++++++-
8 files changed, 25 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index e7cc28f01bb..0fe3abc79a7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1501,9 +1501,9 @@ public class ProcedureManager {
}
}
- public TSStatus alterPipe(TAlterPipeReq req) {
+ public TSStatus alterPipe(final TAlterPipeReq req) {
try {
- AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
+ final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
executor.submitProcedure(procedure);
TSStatus status = waitingProcedureFinished(procedure);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 51007f10236..e53ef00b308 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -174,7 +174,7 @@ public class PipePluginInfo implements SnapshotProcessor {
String.format(
"Failed to create or alter pipe, the pipe extractor plugin %s
does not exist",
sourcePluginName);
- LOGGER.warn(exceptionMessage);
+ LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 0b3acb4058e..929024a0689 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -185,7 +185,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
String.format(
"Failed to create pipe %s, %s",
createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
- LOGGER.warn(exceptionMessage);
+ LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -205,7 +205,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(),
PIPE_NOT_EXIST_MSG);
- LOGGER.warn(exceptionMessage);
+ LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index e3a4719a719..f4fa738428d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -145,7 +145,7 @@ public class CreatePipePluginProcedure extends
AbstractNodeProcedure<CreatePipeP
}
} catch (PipeException e) {
// The pipe plugin has already created, we should end the procedure
- LOGGER.warn(
+ LOGGER.info(
"Pipe plugin {} is already created, end the
CreatePipePluginProcedure({})",
pluginName,
pluginName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 665a3782a91..efbe1ee6ccd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -143,7 +143,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
subscriptionInfo.validatePipePluginUsageByTopic(pluginName);
} catch (PipeException e) {
// if the pipe plugin is a built-in plugin, we should not drop it
- LOGGER.warn(e.getMessage());
+ LOGGER.info(e.getMessage());
pipePluginCoordinator.unlock();
pipeTaskCoordinator.unlock();
setFailure(new ProcedureException(e.getMessage()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 3627e3f82af..52c85334c18 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1071,7 +1071,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final Visibility pluginVisibility =
VisibilityUtils.calculateFromPluginClass(clazz);
final boolean isTableModel = createPipePluginStatement.isTableModel();
if (!VisibilityUtils.isCompatible(pluginVisibility, isTableModel)) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to create PipePlugin({}) because this plugin is not
designed for {} model.",
createPipePluginStatement.getPluginName(),
isTableModel ? "table" : "tree");
@@ -1151,10 +1151,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition())
.setIsTableModel(dropPipePluginStatement.isTableModel()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
executionStatus.getCode()) {
- LOGGER.warn(
- "[{}] Failed to drop pipe plugin {}.",
- executionStatus,
- dropPipePluginStatement.getPluginName());
+ if (TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() !=
executionStatus.getCode()) {
+ LOGGER.warn(
+ "[{}] Failed to drop pipe plugin {}.",
+ executionStatus,
+ dropPipePluginStatement.getPluginName());
+ }
future.setException(new IoTDBException(executionStatus));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index b70d51cca95..34d2e9bf82c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -117,6 +117,7 @@ public class ErrorHandlingUtils {
|| status.getCode() ==
TSStatusCode.EXECUTE_UDF_ERROR.getStatusCode()
|| status.getCode() ==
TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()
|| status.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+ || status.getCode() ==
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode())
{
LOGGER.info(message);
} else {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index af75e6424f7..5f7156f6324 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.commons.pipe.agent.task.subtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -33,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,7 +271,13 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
protected void handleException(final Event event, final Exception e) {
- if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+ if (e instanceof PipeRuntimeOutOfMemoryCriticalException
+ || ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
+ PipeLogger.log(
+ LOGGER::info,
+ e,
+ "Temporarily out of memory in pipe event transferring, will wait for
the memory to release.");
+ } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException)
{
if (lastExceptionTime == Long.MAX_VALUE) {
lastExceptionTime = System.currentTimeMillis();
}