This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch rc/2.0.10
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/2.0.10 by this push:
     new f8b17f84e69 Prevent pipe sink task id from logging secrets (#17981)
f8b17f84e69 is described below

commit f8b17f84e69a431fc621e75012cdb181767b3707
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jun 29 17:41:05 2026 +0800

    Prevent pipe sink task id from logging secrets (#17981)
---
 .../manual/IoTDBPipeTypeConversionISessionIT.java  |  3 +-
 .../api/customizer/parameter/PipeParameters.java   | 10 ++--
 .../subtask/processor/PipeProcessorSubtask.java    |  4 +-
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 36 +++++++++++--
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  8 +--
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 62 ++++++++++++++++------
 .../metric/schema/PipeSchemaRegionSinkMetrics.java | 12 ++++-
 .../metric/sink/PipeDataRegionSinkMetrics.java     | 15 ++++--
 .../agent/task/PipeSinkSubtaskExecutorTest.java    |  1 +
 .../task/subtask/sink/PipeSinkSubtaskTest.java     | 44 +++++++++++++++
 .../agent/task/execution/PipeSubtaskExecutor.java  | 10 ++--
 .../task/subtask/PipeAbstractSinkSubtask.java      |  4 +-
 .../agent/task/subtask/PipeReportableSubtask.java  | 16 +++---
 .../pipe/agent/task/subtask/PipeSubtask.java       |  6 ++-
 14 files changed, 180 insertions(+), 51 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
index 54f4fe2c51f..f2985389d72 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -56,6 +56,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
@@ -324,7 +325,7 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeDualTreeModel
         generateMeasurementSchemas();
 
     // Generate createTimeSeries in sender and receiver
-    String uuid = "bcdedit";
+    String uuid = "bcdedit" + UUID.randomUUID().toString().replace("-", "");
     for (Pair<MeasurementSchema, MeasurementSchema> pair : measurementSchemas) 
{
       createTimeSeries(uuid, pair.left.getMeasurementName(), 
pair.left.getType().name(), senderEnv);
       createTimeSeries(
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c13f87ae579..ac882f6ea4f 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeCo
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -433,13 +434,14 @@ public class PipeParameters {
     }
 
     static String hide(final String key, final String value) {
-      if (Objects.isNull(key)) {
-        return value;
-      }
-      if (KEYS.contains(KeyReducer.reduce(key))) {
+      if (isHiddenKey(key)) {
         return PLACEHOLDER;
       }
       return value;
     }
+
+    public static boolean isHiddenKey(final String key) {
+      return Objects.nonNull(key) && 
KEYS.contains(KeyReducer.reduce(key).toLowerCase(Locale.ROOT));
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index fe9737b0d7d..c37aab5af2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -260,7 +260,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
         throw new PipeException(
             String.format(
                 "Exception in pipe process, subtask: %s, last event: %s, root 
cause: %s",
-                taskID,
+                getDisplayTaskID(),
                 lastEvent instanceof EnrichedEvent
                     ? ((EnrichedEvent) lastEvent).coreReportMessage()
                     : lastEvent,
@@ -300,7 +300,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     } catch (final Exception e) {
       LOGGER.info(
           
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_PROCESSOR_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
           e);
     } finally {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 90d325f6d23..10b746778e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -60,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
 
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String attributeSortedString;
+  private final String attributeDisplayString;
   private final int sinkIndex;
 
   // Now parallel connectors run the same time, thus the heartbeat events are 
not sure
@@ -75,8 +76,27 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask 
{
       final int sinkIndex,
       final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
       final PipeConnector outputPipeConnector) {
+    this(
+        taskID,
+        creationTime,
+        attributeSortedString,
+        attributeSortedString,
+        sinkIndex,
+        inputPendingQueue,
+        outputPipeConnector);
+  }
+
+  public PipeSinkSubtask(
+      final String taskID,
+      final long creationTime,
+      final String attributeSortedString,
+      final String attributeDisplayString,
+      final int sinkIndex,
+      final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
+      final PipeConnector outputPipeConnector) {
     super(taskID, creationTime, outputPipeConnector);
     this.attributeSortedString = attributeSortedString;
+    this.attributeDisplayString = attributeDisplayString;
     this.sinkIndex = sinkIndex;
     this.inputPendingQueue = inputPendingQueue;
 
@@ -156,7 +176,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
           DataNodePipeMessages.PIPECONNECTOR
               + outputPipeSink.getClass().getName()
               + "(id: "
-              + taskID
+              + getDisplayTaskID()
               + ")"
               + " heartbeat failed, or encountered failure when transferring 
generic event. Failure: "
               + e.getMessage(),
@@ -181,13 +201,13 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       outputPipeSink.close();
       LOGGER.info(
           DataNodePipeMessages.PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS,
-          taskID,
+          getDisplayTaskID(),
           outputPipeSink,
           System.currentTimeMillis() - startTime);
     } catch (final Exception e) {
       LOGGER.info(
           
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_CONNECTOR_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
           e);
     } finally {
@@ -366,4 +386,14 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     lastExceptionTime = Long.MAX_VALUE;
     PipeDataNodeAgent.runtime().report(event, exception);
   }
+
+  @Override
+  public String getDisplayTaskID() {
+    return generateDisplayTaskID(attributeDisplayString, creationTime, 
sinkIndex);
+  }
+
+  static String generateDisplayTaskID(
+      final String attributeDisplayString, final long creationTime, final int 
sinkIndex) {
+    return String.format("%s_%s_%s", attributeDisplayString, creationTime, 
sinkIndex);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 42b1ae91366..b4933234659 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -75,7 +75,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     registeredTaskCount++;
     LOGGER.info(
         
DataNodePipeMessages.REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-        subtask,
+        subtask.getDisplayTaskID(),
         runningTaskCount,
         registeredTaskCount);
   }
@@ -112,7 +112,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
       registeredTaskCount--;
       LOGGER.info(
           
DataNodePipeMessages.DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-          subtask,
+          subtask.getDisplayTaskID(),
           runningTaskCount,
           registeredTaskCount);
     }
@@ -135,7 +135,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     runningTaskCount++;
     LOGGER.info(
         
DataNodePipeMessages.START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-        subtask,
+        subtask.getDisplayTaskID(),
         runningTaskCount,
         registeredTaskCount);
   }
@@ -152,7 +152,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     runningTaskCount--;
     LOGGER.info(
         DataNodePipeMessages.STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-        subtask,
+        subtask.getDisplayTaskID(),
         runningTaskCount,
         registeredTaskCount);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 3ad99ca5c06..564a29d51cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -64,6 +64,8 @@ public class PipeSinkSubtaskManager {
   private final Map<String, List<PipeSinkSubtaskLifeCycle>>
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
+  private final Map<String, String> attributeSortedString2DisplayString = new 
HashMap<>();
+
   public synchronized String register(
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
       final PipeParameters pipeSinkParameters,
@@ -92,19 +94,14 @@ public class PipeSinkSubtaskManager {
     final int sinkNum;
     boolean realTimeFirst = false;
     String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
+    final String attributeDisplayString = 
generateAttributeDisplayString(pipeSinkParameters);
     if (isDataRegionSink) {
       sinkNum =
           pipeSinkParameters.getIntOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
                   PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-              PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
-                      pipeSinkParameters
-                          .getStringOrDefault(
-                              Arrays.asList(
-                                  PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                              
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
-                          .toLowerCase())
+              
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
                   ? 1
                   : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
@@ -120,7 +117,9 @@ public class PipeSinkSubtaskManager {
       sinkNum = 1;
       attributeSortedString = "schema_" + attributeSortedString;
     }
-    environment.setAttributeSortedString(attributeSortedString);
+    final String attributeDisplayStringWithPrefix =
+        isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + 
attributeDisplayString;
+    environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final PipeSinkSubtaskExecutor executor = executorSupplier.get();
@@ -138,6 +137,11 @@ public class PipeSinkSubtaskManager {
       }
 
       for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
+        final String taskID =
+            String.format(
+                "%s_%s_%s",
+                attributeDisplayStringWithPrefix, 
environment.getCreationTime(), sinkIndex);
+
         final PipeConnector pipeSink =
             isDataRegionSink
                 ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
@@ -168,10 +172,10 @@ public class PipeSinkSubtaskManager {
         // 2. Construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
         final PipeSinkSubtask pipeSinkSubtask =
             new PipeSinkSubtask(
-                String.format(
-                    "%s_%s_%s", attributeSortedString, 
environment.getCreationTime(), sinkIndex),
+                taskID,
                 environment.getCreationTime(),
                 attributeSortedString,
+                attributeDisplayStringWithPrefix,
                 sinkIndex,
                 pendingQueue,
                 pipeSink);
@@ -182,11 +186,13 @@ public class PipeSinkSubtaskManager {
 
       LOGGER.info(
           DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED,
-          attributeSortedString,
+          attributeDisplayStringWithPrefix,
           executor.getWorkingThreadName(),
           executor.getCallbackThreadName());
       attributeSortedString2SubtaskLifeCycleMap.put(
           attributeSortedString, pipeSinkSubtaskLifeCycleList);
+      attributeSortedString2DisplayString.put(
+          attributeSortedString, attributeDisplayStringWithPrefix);
     }
 
     for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -203,7 +209,7 @@ public class PipeSinkSubtaskManager {
       final int regionId,
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
+      throwNoSuchSubtaskException(attributeSortedString);
     }
 
     final List<PipeSinkSubtaskLifeCycle> lifeCycles =
@@ -219,6 +225,7 @@ public class PipeSinkSubtaskManager {
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
+      attributeSortedString2DisplayString.remove(attributeSortedString);
       executor.shutdown();
       LOGGER.info(
           DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN,
@@ -234,7 +241,7 @@ public class PipeSinkSubtaskManager {
 
   public synchronized void start(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
+      throwNoSuchSubtaskException(attributeSortedString);
     }
 
     for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -245,7 +252,7 @@ public class PipeSinkSubtaskManager {
 
   public synchronized void stop(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
+      throwNoSuchSubtaskException(attributeSortedString);
     }
 
     for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -258,7 +265,8 @@ public class PipeSinkSubtaskManager {
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
-          DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + 
attributeSortedString);
+          DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK
+              + getDisplayStringForException(attributeSortedString));
     }
 
     // All subtasks share the same pending queue
@@ -268,13 +276,35 @@ public class PipeSinkSubtaskManager {
         .getPendingQueue();
   }
 
-  private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
+  private static String generateAttributeSortedString(
+      final PipeParameters pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
     sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
     return sortedStringSourceMap.toString();
   }
 
+  /**
+   * Attribute string for logs, metrics and exception messages with sensitive 
attributes removed.
+   */
+  static String generateAttributeDisplayString(final PipeParameters 
pipeConnectorParameters) {
+    final TreeMap<String, String> filteredAttributes =
+        new TreeMap<>(pipeConnectorParameters.getAttribute());
+    filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
+    
filteredAttributes.keySet().removeIf(PipeParameters.ValueHider::isHiddenKey);
+    return filteredAttributes.toString();
+  }
+
+  private void throwNoSuchSubtaskException(final String attributeSortedString) 
{
+    throw new PipeException(
+        FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE
+            + getDisplayStringForException(attributeSortedString));
+  }
+
+  private String getDisplayStringForException(final String 
attributeSortedString) {
+    return 
attributeSortedString2DisplayString.getOrDefault(attributeSortedString, 
"unknown");
+  }
+
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private PipeSinkSubtaskManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
index d9b79fdda8e..17ff545251c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
@@ -110,7 +110,9 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
 
   public void deregister(final String taskID) {
     if (!connectorMap.containsKey(taskID)) {
-      
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
 taskID);
+      LOGGER.warn(
+          
DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
+          getDisplayTaskID(taskID));
       return;
     }
     if (Objects.nonNull(metricService)) {
@@ -125,12 +127,18 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
     }
     final Rate rate = schemaRateMap.get(taskID);
     if (rate == null) {
-      
LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE, 
taskID);
+      LOGGER.info(
+          DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE, 
getDisplayTaskID(taskID));
       return;
     }
     rate.mark();
   }
 
+  private String getDisplayTaskID(final String taskID) {
+    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    return Objects.nonNull(connector) ? connector.getDisplayTaskID() : 
"unknown";
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeSchemaRegionSinkMetricsHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index dd7707d1b96..003f6f45ef4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -447,7 +447,9 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
 
   public void deregister(final String taskID) {
     if (!sinkMap.containsKey(taskID)) {
-      
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK, 
taskID);
+      LOGGER.warn(
+          DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK,
+          getDisplayTaskID(taskID));
       return;
     }
     if (Objects.nonNull(metricService)) {
@@ -462,7 +464,8 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     }
     final Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK, 
taskID);
+      LOGGER.info(
+          DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK, 
getDisplayTaskID(taskID));
       return;
     }
     rate.mark();
@@ -474,12 +477,18 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     }
     final Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1, 
taskID);
+      LOGGER.info(
+          DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1, 
getDisplayTaskID(taskID));
       return;
     }
     rate.mark();
   }
 
+  private String getDisplayTaskID(final String taskID) {
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
+    return Objects.nonNull(sink) ? sink.getDisplayTaskID() : "unknown";
+  }
+
   public void markPipeHeartbeatEvent(final String taskID) {
     if (Objects.isNull(metricService)) {
       return;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
index a237008ab4e..9cd54b425cf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
@@ -41,6 +41,7 @@ public class PipeSinkSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
                 "PipeConnectorSubtaskExecutorTest",
                 System.currentTimeMillis(),
                 "TestAttributeSortedString",
+                "TestAttributeSortedString",
                 0,
                 mock(UnboundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index 2a15fb9ea18..e9d76a4fc7e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -23,12 +23,18 @@ import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 public class PipeSinkSubtaskTest {
@@ -47,6 +53,7 @@ public class PipeSinkSubtaskTest {
                 "PipeSinkSubtaskTest",
                 System.currentTimeMillis(),
                 "data_test",
+                "data_test",
                 0,
                 (UnboundedBlockingPendingQueue) pendingQueue,
                 connector));
@@ -60,4 +67,41 @@ public class PipeSinkSubtaskTest {
       subtask.close();
     }
   }
+
+  @Test
+  public void testTransferExceptionUsesDisplayTaskID() throws Exception {
+    final PipeConnector connector = mock(PipeConnector.class);
+    final UnboundedBlockingPendingQueue<Event> pendingQueue =
+        mock(UnboundedBlockingPendingQueue.class);
+    final Event event = mock(Event.class);
+
+    when(pendingQueue.waitedPoll()).thenReturn(event);
+    doThrow(new RuntimeException("No more authentication methods available"))
+        .when(connector)
+        .transfer(any(Event.class));
+
+    final PipeSinkSubtask subtask =
+        new PipeSinkSubtask(
+            "data_{sink=TSFILE_REMOTE_SINK, 
sink.scp.password=Iotdb@2026}_1701687309493_0",
+            1701687309493L,
+            "data_{sink=TSFILE_REMOTE_SINK, sink.scp.password=Iotdb@2026}",
+            "data_{sink=TSFILE_REMOTE_SINK, sink.scp.host=172.20.70.119}",
+            0,
+            pendingQueue,
+            connector);
+
+    try {
+      subtask.executeOnce();
+      Assert.fail();
+    } catch (final PipeException e) {
+      Assert.assertTrue(e.getMessage().contains("Exception in pipe transfer, 
subtask: data_{"));
+      Assert.assertTrue(e.getMessage().contains("sink=TSFILE_REMOTE_SINK"));
+      
Assert.assertTrue(e.getMessage().contains("sink.scp.host=172.20.70.119"));
+      Assert.assertTrue(e.getMessage().contains("No more authentication 
methods available"));
+      Assert.assertFalse(e.getMessage().contains("sink.scp.password"));
+      Assert.assertFalse(e.getMessage().contains("Iotdb@2026"));
+    } finally {
+      subtask.close();
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index c28cfb25692..f14bb483e53 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -99,7 +99,7 @@ public abstract class PipeSubtaskExecutor {
 
   public final synchronized void register(final PipeSubtask subtask) {
     if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
-      LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED, 
getSafeSubtaskStr(subtask.getTaskID()));
+      LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED, 
subtask.getDisplayTaskID());
       return;
     }
 
@@ -125,13 +125,13 @@ public abstract class PipeSubtaskExecutor {
     final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
     if (subtask.isSubmittingSelf()) {
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING, 
getSafeSubtaskStr(subTaskID));
+        LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING, 
subtask.getDisplayTaskID());
       }
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
       ++runningSubtaskNumber;
-      LOGGER.info(PipeMessages.SUBTASK_STARTED, getSafeSubtaskStr(subTaskID));
+      LOGGER.info(PipeMessages.SUBTASK_STARTED, subtask.getDisplayTaskID());
     }
   }
 
@@ -154,9 +154,9 @@ public abstract class PipeSubtaskExecutor {
     if (subtask != null) {
       try {
         subtask.close();
-        LOGGER.info(PipeMessages.SUBTASK_CLOSED, getSafeSubtaskStr(subTaskID));
+        LOGGER.info(PipeMessages.SUBTASK_CLOSED, subtask.getDisplayTaskID());
       } catch (final Exception e) {
-        LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED, 
getSafeSubtaskStr(subTaskID), e);
+        LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED, 
subtask.getDisplayTaskID(), e);
       }
     }
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 790e5fe4c6d..9dbf5af2d0c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -199,7 +199,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
           PipeMessages.HANDSHAKE_FAILED_STOPPING,
           outputPipeSink.getClass().getName(),
           MAX_RETRY_TIMES,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable);
@@ -287,7 +287,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         throw new PipeException(
             String.format(
                 PipeMessages.EXCEPTION_IN_PIPE_TRANSFER_FORMAT,
-                taskID,
+                getDisplayTaskID(),
                 event instanceof EnrichedEvent
                     ? ((EnrichedEvent) event).coreReportMessage()
                     : event,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 1f43403157b..52bc3bd61f8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -88,7 +88,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           PipeMessages.FAILED_TO_EXECUTE_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable.getMessage(),
@@ -102,7 +102,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
           LOGGER::warn,
           throwable,
           PipeMessages.RETRY_EXECUTING_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
@@ -113,7 +113,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       } catch (final InterruptedException e) {
         LOGGER.warn(
             PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
-            taskID,
+            getDisplayTaskID(),
             creationTime,
             this.getClass().getSimpleName(),
             e);
@@ -125,7 +125,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       final String errorMessage =
           String.format(
               PipeMessages.SUBTASK_RETRY_EXCEEDED_FORMAT,
-              taskID,
+              getDisplayTaskID(),
               creationTime,
               this.getClass().getSimpleName(),
               retryCount.get() - 1,
@@ -139,7 +139,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
               : new PipeRuntimeCriticalException(errorMessage));
       LOGGER.warn(
           PipeMessages.SUBTASK_EXCEPTION_REPORTED,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable);
@@ -154,7 +154,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           PipeMessages.FAILED_TO_EXECUTE_SUBTASK_RETRY_FOREVER,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable.getMessage(),
@@ -165,7 +165,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     PipeLogger.log(
         LOGGER::warn,
         PipeMessages.RETRY_EXECUTING_SUBTASK_FOREVER,
-        taskID,
+        getDisplayTaskID(),
         creationTime,
         this.getClass().getSimpleName(),
         retryCount.get(),
@@ -176,7 +176,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     } catch (final InterruptedException e) {
       LOGGER.warn(
           PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName());
       Thread.currentThread().interrupt();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 2da797c2b3b..29f651a1ac9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -114,7 +114,7 @@ public abstract class PipeSubtask
     if (totalRetryCount != 0) {
       LOGGER.warn(
           "Successfully executed subtask {}({}) after {} retries.",
-          taskID,
+          getDisplayTaskID(),
           this.getClass().getSimpleName(),
           totalRetryCount);
     }
@@ -196,6 +196,10 @@ public abstract class PipeSubtask
     return taskID;
   }
 
+  public String getDisplayTaskID() {
+    return taskID;
+  }
+
   public long getCreationTime() {
     return creationTime;
   }


Reply via email to