This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d6d57559075 Prevent pipe sink task id from logging secrets (#17981)
d6d57559075 is described below
commit d6d57559075261ccdc26d246c22d04d13378f541
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 18 14:18:40 2026 +0800
Prevent pipe sink task id from logging secrets (#17981)
* Prevent pipe sink task id from logging secrets
* Fix pipe sink subtask compatibility
---
.../api/customizer/parameter/PipeParameters.java | 10 +++--
.../subtask/processor/PipeProcessorSubtask.java | 4 +-
.../agent/task/subtask/sink/PipeSinkSubtask.java | 36 ++++++++++++++++--
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 8 ++--
.../task/subtask/sink/PipeSinkSubtaskManager.java | 14 ++++---
.../metric/schema/PipeSchemaRegionSinkMetrics.java | 12 +++++-
.../metric/sink/PipeDataRegionSinkMetrics.java | 15 ++++++--
.../agent/task/PipeSinkSubtaskExecutorTest.java | 1 +
.../task/subtask/sink/PipeSinkSubtaskTest.java | 44 ++++++++++++++++++++++
.../agent/task/execution/PipeSubtaskExecutor.java | 10 ++---
.../task/subtask/PipeAbstractSinkSubtask.java | 4 +-
.../agent/task/subtask/PipeReportableSubtask.java | 16 ++++----
.../pipe/agent/task/subtask/PipeSubtask.java | 6 ++-
13 files changed, 140 insertions(+), 40 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index f9ef2a64a83..7286d120991 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeCo
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -434,13 +435,14 @@ public class PipeParameters {
}
static String hide(final String key, final String value) {
- if (Objects.isNull(key)) {
- return value;
- }
- if (KEYS.contains(KeyReducer.reduce(key))) {
+ if (isHiddenKey(key)) {
return PLACEHOLDER;
}
return value;
}
+
+ public static boolean isHiddenKey(final String key) {
+ return Objects.nonNull(key) &&
KEYS.contains(KeyReducer.reduce(key).toLowerCase(Locale.ROOT));
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index fe9737b0d7d..c37aab5af2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -260,7 +260,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
throw new PipeException(
String.format(
"Exception in pipe process, subtask: %s, last event: %s, root
cause: %s",
- taskID,
+ getDisplayTaskID(),
lastEvent instanceof EnrichedEvent
? ((EnrichedEvent) lastEvent).coreReportMessage()
: lastEvent,
@@ -300,7 +300,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
} catch (final Exception e) {
LOGGER.info(
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_PROCESSOR_SUBTASK,
- taskID,
+ getDisplayTaskID(),
ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 90d325f6d23..10b746778e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -60,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String attributeSortedString;
+ private final String attributeDisplayString;
private final int sinkIndex;
// Now parallel connectors run the same time, thus the heartbeat events are
not sure
@@ -75,8 +76,27 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask
{
final int sinkIndex,
final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
final PipeConnector outputPipeConnector) {
+ this(
+ taskID,
+ creationTime,
+ attributeSortedString,
+ attributeSortedString,
+ sinkIndex,
+ inputPendingQueue,
+ outputPipeConnector);
+ }
+
+ public PipeSinkSubtask(
+ final String taskID,
+ final long creationTime,
+ final String attributeSortedString,
+ final String attributeDisplayString,
+ final int sinkIndex,
+ final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
+ final PipeConnector outputPipeConnector) {
super(taskID, creationTime, outputPipeConnector);
this.attributeSortedString = attributeSortedString;
+ this.attributeDisplayString = attributeDisplayString;
this.sinkIndex = sinkIndex;
this.inputPendingQueue = inputPendingQueue;
@@ -156,7 +176,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
DataNodePipeMessages.PIPECONNECTOR
+ outputPipeSink.getClass().getName()
+ "(id: "
- + taskID
+ + getDisplayTaskID()
+ ")"
+ " heartbeat failed, or encountered failure when transferring
generic event. Failure: "
+ e.getMessage(),
@@ -181,13 +201,13 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
outputPipeSink.close();
LOGGER.info(
DataNodePipeMessages.PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS,
- taskID,
+ getDisplayTaskID(),
outputPipeSink,
System.currentTimeMillis() - startTime);
} catch (final Exception e) {
LOGGER.info(
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_CONNECTOR_SUBTASK,
- taskID,
+ getDisplayTaskID(),
ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
@@ -366,4 +386,14 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
lastExceptionTime = Long.MAX_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
+
+ @Override
+ public String getDisplayTaskID() {
+ return generateDisplayTaskID(attributeDisplayString, creationTime,
sinkIndex);
+ }
+
+ static String generateDisplayTaskID(
+ final String attributeDisplayString, final long creationTime, final int
sinkIndex) {
+ return String.format("%s_%s_%s", attributeDisplayString, creationTime,
sinkIndex);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 42b1ae91366..b4933234659 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -75,7 +75,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
registeredTaskCount++;
LOGGER.info(
DataNodePipeMessages.REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
@@ -112,7 +112,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
registeredTaskCount--;
LOGGER.info(
DataNodePipeMessages.DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
@@ -135,7 +135,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
runningTaskCount++;
LOGGER.info(
DataNodePipeMessages.START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
@@ -152,7 +152,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
runningTaskCount--;
LOGGER.info(
DataNodePipeMessages.STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 36c024090a8..cca6de44f0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -148,8 +148,7 @@ public class PipeSinkSubtaskManager {
for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
final String taskID =
String.format(
- "%s_%s_%s",
- attributeDisplayStringWithPrefix,
environment.getCreationTime(), sinkIndex);
+ "%s_%s_%s", attributeSortedString,
environment.getCreationTime(), sinkIndex);
environment.setSinkTaskId(taskID);
final PipeConnector pipeSink =
@@ -184,6 +183,7 @@ public class PipeSinkSubtaskManager {
new PipeSinkSubtask(
taskID,
environment.getCreationTime(),
+ attributeSortedString,
attributeDisplayStringWithPrefix,
sinkIndex,
pendingQueue,
@@ -293,13 +293,15 @@ public class PipeSinkSubtaskManager {
return sortedStringSourceMap.toString();
}
- /** Masked attribute string for logs, metrics and exception messages. */
- private static String generateAttributeDisplayString(
- final PipeParameters pipeConnectorParameters) {
+ /**
+ * Attribute string for logs, metrics and exception messages with sensitive
attributes removed.
+ */
+ static String generateAttributeDisplayString(final PipeParameters
pipeConnectorParameters) {
final TreeMap<String, String> filteredAttributes =
new TreeMap<>(pipeConnectorParameters.getAttribute());
filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
- return new PipeParameters(filteredAttributes).toString();
+
filteredAttributes.keySet().removeIf(PipeParameters.ValueHider::isHiddenKey);
+ return filteredAttributes.toString();
}
private void throwNoSuchSubtaskException(final String attributeSortedString)
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
index d9b79fdda8e..17ff545251c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
@@ -110,7 +110,9 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
public void deregister(final String taskID) {
if (!connectorMap.containsKey(taskID)) {
-
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
taskID);
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
+ getDisplayTaskID(taskID));
return;
}
if (Objects.nonNull(metricService)) {
@@ -125,12 +127,18 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
}
final Rate rate = schemaRateMap.get(taskID);
if (rate == null) {
-
LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE,
taskID);
+ LOGGER.info(
+ DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE,
getDisplayTaskID(taskID));
return;
}
rate.mark();
}
+ private String getDisplayTaskID(final String taskID) {
+ final PipeSinkSubtask connector = connectorMap.get(taskID);
+ return Objects.nonNull(connector) ? connector.getDisplayTaskID() :
"unknown";
+ }
+
//////////////////////////// singleton ////////////////////////////
private static class PipeSchemaRegionSinkMetricsHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 9b2f876ff2d..41f08f055c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -447,7 +447,9 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
public void deregister(final String taskID) {
if (!sinkMap.containsKey(taskID)) {
-
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK,
taskID);
+ LOGGER.warn(
+ DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK,
+ getDisplayTaskID(taskID));
return;
}
if (Objects.nonNull(metricService)) {
@@ -462,7 +464,8 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
}
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
- LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK,
taskID);
+ LOGGER.info(
+ DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK,
getDisplayTaskID(taskID));
return;
}
rate.mark();
@@ -474,12 +477,18 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
}
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
- LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1,
taskID);
+ LOGGER.info(
+ DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1,
getDisplayTaskID(taskID));
return;
}
rate.mark();
}
+ private String getDisplayTaskID(final String taskID) {
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
+ return Objects.nonNull(sink) ? sink.getDisplayTaskID() : "unknown";
+ }
+
public void markPipeHeartbeatEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
index a237008ab4e..9cd54b425cf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
@@ -41,6 +41,7 @@ public class PipeSinkSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
"PipeConnectorSubtaskExecutorTest",
System.currentTimeMillis(),
"TestAttributeSortedString",
+ "TestAttributeSortedString",
0,
mock(UnboundedBlockingPendingQueue.class),
mock(PipeConnector.class)));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index 2a15fb9ea18..e9d76a4fc7e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -23,12 +23,18 @@ import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
public class PipeSinkSubtaskTest {
@@ -47,6 +53,7 @@ public class PipeSinkSubtaskTest {
"PipeSinkSubtaskTest",
System.currentTimeMillis(),
"data_test",
+ "data_test",
0,
(UnboundedBlockingPendingQueue) pendingQueue,
connector));
@@ -60,4 +67,41 @@ public class PipeSinkSubtaskTest {
subtask.close();
}
}
+
+ @Test
+ public void testTransferExceptionUsesDisplayTaskID() throws Exception {
+ final PipeConnector connector = mock(PipeConnector.class);
+ final UnboundedBlockingPendingQueue<Event> pendingQueue =
+ mock(UnboundedBlockingPendingQueue.class);
+ final Event event = mock(Event.class);
+
+ when(pendingQueue.waitedPoll()).thenReturn(event);
+ doThrow(new RuntimeException("No more authentication methods available"))
+ .when(connector)
+ .transfer(any(Event.class));
+
+ final PipeSinkSubtask subtask =
+ new PipeSinkSubtask(
+ "data_{sink=TSFILE_REMOTE_SINK,
sink.scp.password=Iotdb@2026}_1701687309493_0",
+ 1701687309493L,
+ "data_{sink=TSFILE_REMOTE_SINK, sink.scp.password=Iotdb@2026}",
+ "data_{sink=TSFILE_REMOTE_SINK, sink.scp.host=172.20.70.119}",
+ 0,
+ pendingQueue,
+ connector);
+
+ try {
+ subtask.executeOnce();
+ Assert.fail();
+ } catch (final PipeException e) {
+ Assert.assertTrue(e.getMessage().contains("Exception in pipe transfer,
subtask: data_{"));
+ Assert.assertTrue(e.getMessage().contains("sink=TSFILE_REMOTE_SINK"));
+
Assert.assertTrue(e.getMessage().contains("sink.scp.host=172.20.70.119"));
+ Assert.assertTrue(e.getMessage().contains("No more authentication
methods available"));
+ Assert.assertFalse(e.getMessage().contains("sink.scp.password"));
+ Assert.assertFalse(e.getMessage().contains("Iotdb@2026"));
+ } finally {
+ subtask.close();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index c28cfb25692..f14bb483e53 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -99,7 +99,7 @@ public abstract class PipeSubtaskExecutor {
public final synchronized void register(final PipeSubtask subtask) {
if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
- LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED,
getSafeSubtaskStr(subtask.getTaskID()));
+ LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED,
subtask.getDisplayTaskID());
return;
}
@@ -125,13 +125,13 @@ public abstract class PipeSubtaskExecutor {
final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
if (subtask.isSubmittingSelf()) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING,
getSafeSubtaskStr(subTaskID));
+ LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING,
subtask.getDisplayTaskID());
}
} else {
subtask.allowSubmittingSelf();
subtask.submitSelf();
++runningSubtaskNumber;
- LOGGER.info(PipeMessages.SUBTASK_STARTED, getSafeSubtaskStr(subTaskID));
+ LOGGER.info(PipeMessages.SUBTASK_STARTED, subtask.getDisplayTaskID());
}
}
@@ -154,9 +154,9 @@ public abstract class PipeSubtaskExecutor {
if (subtask != null) {
try {
subtask.close();
- LOGGER.info(PipeMessages.SUBTASK_CLOSED, getSafeSubtaskStr(subTaskID));
+ LOGGER.info(PipeMessages.SUBTASK_CLOSED, subtask.getDisplayTaskID());
} catch (final Exception e) {
- LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED,
getSafeSubtaskStr(subTaskID), e);
+ LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED,
subtask.getDisplayTaskID(), e);
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 790e5fe4c6d..9dbf5af2d0c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -199,7 +199,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
PipeMessages.HANDSHAKE_FAILED_STOPPING,
outputPipeSink.getClass().getName(),
MAX_RETRY_TIMES,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable);
@@ -287,7 +287,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
throw new PipeException(
String.format(
PipeMessages.EXCEPTION_IN_PIPE_TRANSFER_FORMAT,
- taskID,
+ getDisplayTaskID(),
event instanceof EnrichedEvent
? ((EnrichedEvent) event).coreReportMessage()
: event,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 1f43403157b..52bc3bd61f8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -88,7 +88,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
if (retryCount.get() == 0) {
LOGGER.warn(
PipeMessages.FAILED_TO_EXECUTE_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
@@ -102,7 +102,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
LOGGER::warn,
throwable,
PipeMessages.RETRY_EXECUTING_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
@@ -113,7 +113,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
} catch (final InterruptedException e) {
LOGGER.warn(
PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
e);
@@ -125,7 +125,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
final String errorMessage =
String.format(
PipeMessages.SUBTASK_RETRY_EXCEEDED_FORMAT,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
retryCount.get() - 1,
@@ -139,7 +139,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
: new PipeRuntimeCriticalException(errorMessage));
LOGGER.warn(
PipeMessages.SUBTASK_EXCEPTION_REPORTED,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable);
@@ -154,7 +154,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
if (retryCount.get() == 0) {
LOGGER.warn(
PipeMessages.FAILED_TO_EXECUTE_SUBTASK_RETRY_FOREVER,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
@@ -165,7 +165,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
PipeLogger.log(
LOGGER::warn,
PipeMessages.RETRY_EXECUTING_SUBTASK_FOREVER,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
@@ -176,7 +176,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
} catch (final InterruptedException e) {
LOGGER.warn(
PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName());
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 2da797c2b3b..29f651a1ac9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -114,7 +114,7 @@ public abstract class PipeSubtask
if (totalRetryCount != 0) {
LOGGER.warn(
"Successfully executed subtask {}({}) after {} retries.",
- taskID,
+ getDisplayTaskID(),
this.getClass().getSimpleName(),
totalRetryCount);
}
@@ -196,6 +196,10 @@ public abstract class PipeSubtask
return taskID;
}
+ public String getDisplayTaskID() {
+ return taskID;
+ }
+
public long getCreationTime() {
return creationTime;
}