This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d6d57559075 Prevent pipe sink task id from logging secrets (#17981)
d6d57559075 is described below

commit d6d57559075261ccdc26d246c22d04d13378f541
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 18 14:18:40 2026 +0800

    Prevent pipe sink task id from logging secrets (#17981)
    
    * Prevent pipe sink task id from logging secrets
    
    * Fix pipe sink subtask compatibility
---
 .../api/customizer/parameter/PipeParameters.java   | 10 +++--
 .../subtask/processor/PipeProcessorSubtask.java    |  4 +-
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 36 ++++++++++++++++--
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  8 ++--
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 14 ++++---
 .../metric/schema/PipeSchemaRegionSinkMetrics.java | 12 +++++-
 .../metric/sink/PipeDataRegionSinkMetrics.java     | 15 ++++++--
 .../agent/task/PipeSinkSubtaskExecutorTest.java    |  1 +
 .../task/subtask/sink/PipeSinkSubtaskTest.java     | 44 ++++++++++++++++++++++
 .../agent/task/execution/PipeSubtaskExecutor.java  | 10 ++---
 .../task/subtask/PipeAbstractSinkSubtask.java      |  4 +-
 .../agent/task/subtask/PipeReportableSubtask.java  | 16 ++++----
 .../pipe/agent/task/subtask/PipeSubtask.java       |  6 ++-
 13 files changed, 140 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index f9ef2a64a83..7286d120991 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeCo
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -434,13 +435,14 @@ public class PipeParameters {
     }
 
     static String hide(final String key, final String value) {
-      if (Objects.isNull(key)) {
-        return value;
-      }
-      if (KEYS.contains(KeyReducer.reduce(key))) {
+      if (isHiddenKey(key)) {
         return PLACEHOLDER;
       }
       return value;
     }
+
+    public static boolean isHiddenKey(final String key) {
+      return Objects.nonNull(key) && 
KEYS.contains(KeyReducer.reduce(key).toLowerCase(Locale.ROOT));
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index fe9737b0d7d..c37aab5af2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -260,7 +260,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
         throw new PipeException(
             String.format(
                 "Exception in pipe process, subtask: %s, last event: %s, root 
cause: %s",
-                taskID,
+                getDisplayTaskID(),
                 lastEvent instanceof EnrichedEvent
                     ? ((EnrichedEvent) lastEvent).coreReportMessage()
                     : lastEvent,
@@ -300,7 +300,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     } catch (final Exception e) {
       LOGGER.info(
           
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_PROCESSOR_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
           e);
     } finally {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 90d325f6d23..10b746778e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -60,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
 
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String attributeSortedString;
+  private final String attributeDisplayString;
   private final int sinkIndex;
 
   // Now parallel connectors run the same time, thus the heartbeat events are 
not sure
@@ -75,8 +76,27 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask 
{
       final int sinkIndex,
       final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
       final PipeConnector outputPipeConnector) {
+    this(
+        taskID,
+        creationTime,
+        attributeSortedString,
+        attributeSortedString,
+        sinkIndex,
+        inputPendingQueue,
+        outputPipeConnector);
+  }
+
+  public PipeSinkSubtask(
+      final String taskID,
+      final long creationTime,
+      final String attributeSortedString,
+      final String attributeDisplayString,
+      final int sinkIndex,
+      final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
+      final PipeConnector outputPipeConnector) {
     super(taskID, creationTime, outputPipeConnector);
     this.attributeSortedString = attributeSortedString;
+    this.attributeDisplayString = attributeDisplayString;
     this.sinkIndex = sinkIndex;
     this.inputPendingQueue = inputPendingQueue;
 
@@ -156,7 +176,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
           DataNodePipeMessages.PIPECONNECTOR
               + outputPipeSink.getClass().getName()
               + "(id: "
-              + taskID
+              + getDisplayTaskID()
               + ")"
               + " heartbeat failed, or encountered failure when transferring 
generic event. Failure: "
               + e.getMessage(),
@@ -181,13 +201,13 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       outputPipeSink.close();
       LOGGER.info(
           DataNodePipeMessages.PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS,
-          taskID,
+          getDisplayTaskID(),
           outputPipeSink,
           System.currentTimeMillis() - startTime);
     } catch (final Exception e) {
       LOGGER.info(
           
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_CONNECTOR_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
           e);
     } finally {
@@ -366,4 +386,14 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     lastExceptionTime = Long.MAX_VALUE;
     PipeDataNodeAgent.runtime().report(event, exception);
   }
+
+  @Override
+  public String getDisplayTaskID() {
+    return generateDisplayTaskID(attributeDisplayString, creationTime, 
sinkIndex);
+  }
+
+  static String generateDisplayTaskID(
+      final String attributeDisplayString, final long creationTime, final int 
sinkIndex) {
+    return String.format("%s_%s_%s", attributeDisplayString, creationTime, 
sinkIndex);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 42b1ae91366..b4933234659 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -75,7 +75,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     registeredTaskCount++;
     LOGGER.info(
         
DataNodePipeMessages.REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-        subtask,
+        subtask.getDisplayTaskID(),
         runningTaskCount,
         registeredTaskCount);
   }
@@ -112,7 +112,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
       registeredTaskCount--;
       LOGGER.info(
           
DataNodePipeMessages.DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-          subtask,
+          subtask.getDisplayTaskID(),
           runningTaskCount,
           registeredTaskCount);
     }
@@ -135,7 +135,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     runningTaskCount++;
     LOGGER.info(
         
DataNodePipeMessages.START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-        subtask,
+        subtask.getDisplayTaskID(),
         runningTaskCount,
         registeredTaskCount);
   }
@@ -152,7 +152,7 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     runningTaskCount--;
     LOGGER.info(
         DataNodePipeMessages.STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
-        subtask,
+        subtask.getDisplayTaskID(),
         runningTaskCount,
         registeredTaskCount);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 36c024090a8..cca6de44f0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -148,8 +148,7 @@ public class PipeSinkSubtaskManager {
       for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
         final String taskID =
             String.format(
-                "%s_%s_%s",
-                attributeDisplayStringWithPrefix, 
environment.getCreationTime(), sinkIndex);
+                "%s_%s_%s", attributeSortedString, 
environment.getCreationTime(), sinkIndex);
         environment.setSinkTaskId(taskID);
 
         final PipeConnector pipeSink =
@@ -184,6 +183,7 @@ public class PipeSinkSubtaskManager {
             new PipeSinkSubtask(
                 taskID,
                 environment.getCreationTime(),
+                attributeSortedString,
                 attributeDisplayStringWithPrefix,
                 sinkIndex,
                 pendingQueue,
@@ -293,13 +293,15 @@ public class PipeSinkSubtaskManager {
     return sortedStringSourceMap.toString();
   }
 
-  /** Masked attribute string for logs, metrics and exception messages. */
-  private static String generateAttributeDisplayString(
-      final PipeParameters pipeConnectorParameters) {
+  /**
+   * Attribute string for logs, metrics and exception messages with sensitive 
attributes removed.
+   */
+  static String generateAttributeDisplayString(final PipeParameters 
pipeConnectorParameters) {
     final TreeMap<String, String> filteredAttributes =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
     filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
-    return new PipeParameters(filteredAttributes).toString();
+    
filteredAttributes.keySet().removeIf(PipeParameters.ValueHider::isHiddenKey);
+    return filteredAttributes.toString();
   }
 
   private void throwNoSuchSubtaskException(final String attributeSortedString) 
{
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
index d9b79fdda8e..17ff545251c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
@@ -110,7 +110,9 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
 
   public void deregister(final String taskID) {
     if (!connectorMap.containsKey(taskID)) {
-      
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
 taskID);
+      LOGGER.warn(
+          
DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
+          getDisplayTaskID(taskID));
       return;
     }
     if (Objects.nonNull(metricService)) {
@@ -125,12 +127,18 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
     }
     final Rate rate = schemaRateMap.get(taskID);
     if (rate == null) {
-      
LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE, 
taskID);
+      LOGGER.info(
+          DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE, 
getDisplayTaskID(taskID));
       return;
     }
     rate.mark();
   }
 
+  private String getDisplayTaskID(final String taskID) {
+    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    return Objects.nonNull(connector) ? connector.getDisplayTaskID() : 
"unknown";
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeSchemaRegionSinkMetricsHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 9b2f876ff2d..41f08f055c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -447,7 +447,9 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
 
   public void deregister(final String taskID) {
     if (!sinkMap.containsKey(taskID)) {
-      
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK, 
taskID);
+      LOGGER.warn(
+          DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK,
+          getDisplayTaskID(taskID));
       return;
     }
     if (Objects.nonNull(metricService)) {
@@ -462,7 +464,8 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     }
     final Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK, 
taskID);
+      LOGGER.info(
+          DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK, 
getDisplayTaskID(taskID));
       return;
     }
     rate.mark();
@@ -474,12 +477,18 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     }
     final Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1, 
taskID);
+      LOGGER.info(
+          DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1, 
getDisplayTaskID(taskID));
       return;
     }
     rate.mark();
   }
 
+  private String getDisplayTaskID(final String taskID) {
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
+    return Objects.nonNull(sink) ? sink.getDisplayTaskID() : "unknown";
+  }
+
   public void markPipeHeartbeatEvent(final String taskID) {
     if (Objects.isNull(metricService)) {
       return;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
index a237008ab4e..9cd54b425cf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
@@ -41,6 +41,7 @@ public class PipeSinkSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
                 "PipeConnectorSubtaskExecutorTest",
                 System.currentTimeMillis(),
                 "TestAttributeSortedString",
+                "TestAttributeSortedString",
                 0,
                 mock(UnboundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index 2a15fb9ea18..e9d76a4fc7e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -23,12 +23,18 @@ import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 public class PipeSinkSubtaskTest {
@@ -47,6 +53,7 @@ public class PipeSinkSubtaskTest {
                 "PipeSinkSubtaskTest",
                 System.currentTimeMillis(),
                 "data_test",
+                "data_test",
                 0,
                 (UnboundedBlockingPendingQueue) pendingQueue,
                 connector));
@@ -60,4 +67,41 @@ public class PipeSinkSubtaskTest {
       subtask.close();
     }
   }
+
+  @Test
+  public void testTransferExceptionUsesDisplayTaskID() throws Exception {
+    final PipeConnector connector = mock(PipeConnector.class);
+    final UnboundedBlockingPendingQueue<Event> pendingQueue =
+        mock(UnboundedBlockingPendingQueue.class);
+    final Event event = mock(Event.class);
+
+    when(pendingQueue.waitedPoll()).thenReturn(event);
+    doThrow(new RuntimeException("No more authentication methods available"))
+        .when(connector)
+        .transfer(any(Event.class));
+
+    final PipeSinkSubtask subtask =
+        new PipeSinkSubtask(
+            "data_{sink=TSFILE_REMOTE_SINK, 
sink.scp.password=Iotdb@2026}_1701687309493_0",
+            1701687309493L,
+            "data_{sink=TSFILE_REMOTE_SINK, sink.scp.password=Iotdb@2026}",
+            "data_{sink=TSFILE_REMOTE_SINK, sink.scp.host=172.20.70.119}",
+            0,
+            pendingQueue,
+            connector);
+
+    try {
+      subtask.executeOnce();
+      Assert.fail();
+    } catch (final PipeException e) {
+      Assert.assertTrue(e.getMessage().contains("Exception in pipe transfer, 
subtask: data_{"));
+      Assert.assertTrue(e.getMessage().contains("sink=TSFILE_REMOTE_SINK"));
+      
Assert.assertTrue(e.getMessage().contains("sink.scp.host=172.20.70.119"));
+      Assert.assertTrue(e.getMessage().contains("No more authentication 
methods available"));
+      Assert.assertFalse(e.getMessage().contains("sink.scp.password"));
+      Assert.assertFalse(e.getMessage().contains("Iotdb@2026"));
+    } finally {
+      subtask.close();
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index c28cfb25692..f14bb483e53 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -99,7 +99,7 @@ public abstract class PipeSubtaskExecutor {
 
   public final synchronized void register(final PipeSubtask subtask) {
     if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
-      LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED, 
getSafeSubtaskStr(subtask.getTaskID()));
+      LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED, 
subtask.getDisplayTaskID());
       return;
     }
 
@@ -125,13 +125,13 @@ public abstract class PipeSubtaskExecutor {
     final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
     if (subtask.isSubmittingSelf()) {
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING, 
getSafeSubtaskStr(subTaskID));
+        LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING, 
subtask.getDisplayTaskID());
       }
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
       ++runningSubtaskNumber;
-      LOGGER.info(PipeMessages.SUBTASK_STARTED, getSafeSubtaskStr(subTaskID));
+      LOGGER.info(PipeMessages.SUBTASK_STARTED, subtask.getDisplayTaskID());
     }
   }
 
@@ -154,9 +154,9 @@ public abstract class PipeSubtaskExecutor {
     if (subtask != null) {
       try {
         subtask.close();
-        LOGGER.info(PipeMessages.SUBTASK_CLOSED, getSafeSubtaskStr(subTaskID));
+        LOGGER.info(PipeMessages.SUBTASK_CLOSED, subtask.getDisplayTaskID());
       } catch (final Exception e) {
-        LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED, 
getSafeSubtaskStr(subTaskID), e);
+        LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED, 
subtask.getDisplayTaskID(), e);
       }
     }
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 790e5fe4c6d..9dbf5af2d0c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -199,7 +199,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
           PipeMessages.HANDSHAKE_FAILED_STOPPING,
           outputPipeSink.getClass().getName(),
           MAX_RETRY_TIMES,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable);
@@ -287,7 +287,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         throw new PipeException(
             String.format(
                 PipeMessages.EXCEPTION_IN_PIPE_TRANSFER_FORMAT,
-                taskID,
+                getDisplayTaskID(),
                 event instanceof EnrichedEvent
                     ? ((EnrichedEvent) event).coreReportMessage()
                     : event,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 1f43403157b..52bc3bd61f8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -88,7 +88,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           PipeMessages.FAILED_TO_EXECUTE_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable.getMessage(),
@@ -102,7 +102,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
           LOGGER::warn,
           throwable,
           PipeMessages.RETRY_EXECUTING_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
@@ -113,7 +113,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       } catch (final InterruptedException e) {
         LOGGER.warn(
             PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
-            taskID,
+            getDisplayTaskID(),
             creationTime,
             this.getClass().getSimpleName(),
             e);
@@ -125,7 +125,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       final String errorMessage =
           String.format(
               PipeMessages.SUBTASK_RETRY_EXCEEDED_FORMAT,
-              taskID,
+              getDisplayTaskID(),
               creationTime,
               this.getClass().getSimpleName(),
               retryCount.get() - 1,
@@ -139,7 +139,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
               : new PipeRuntimeCriticalException(errorMessage));
       LOGGER.warn(
           PipeMessages.SUBTASK_EXCEPTION_REPORTED,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable);
@@ -154,7 +154,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           PipeMessages.FAILED_TO_EXECUTE_SUBTASK_RETRY_FOREVER,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName(),
           throwable.getMessage(),
@@ -165,7 +165,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     PipeLogger.log(
         LOGGER::warn,
         PipeMessages.RETRY_EXECUTING_SUBTASK_FOREVER,
-        taskID,
+        getDisplayTaskID(),
         creationTime,
         this.getClass().getSimpleName(),
         retryCount.get(),
@@ -176,7 +176,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     } catch (final InterruptedException e) {
       LOGGER.warn(
           PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
-          taskID,
+          getDisplayTaskID(),
           creationTime,
           this.getClass().getSimpleName());
       Thread.currentThread().interrupt();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 2da797c2b3b..29f651a1ac9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -114,7 +114,7 @@ public abstract class PipeSubtask
     if (totalRetryCount != 0) {
       LOGGER.warn(
           "Successfully executed subtask {}({}) after {} retries.",
-          taskID,
+          getDisplayTaskID(),
           this.getClass().getSimpleName(),
           totalRetryCount);
     }
@@ -196,6 +196,10 @@ public abstract class PipeSubtask
     return taskID;
   }
 
+  public String getDisplayTaskID() {
+    return taskID;
+  }
+
   public long getCreationTime() {
     return creationTime;
   }

Reply via email to