This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch rc/2.0.10
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/2.0.10 by this push:
new f8b17f84e69 Prevent pipe sink task id from logging secrets (#17981)
f8b17f84e69 is described below
commit f8b17f84e69a431fc621e75012cdb181767b3707
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jun 29 17:41:05 2026 +0800
Prevent pipe sink task id from logging secrets (#17981)
---
.../manual/IoTDBPipeTypeConversionISessionIT.java | 3 +-
.../api/customizer/parameter/PipeParameters.java | 10 ++--
.../subtask/processor/PipeProcessorSubtask.java | 4 +-
.../agent/task/subtask/sink/PipeSinkSubtask.java | 36 +++++++++++--
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 8 +--
.../task/subtask/sink/PipeSinkSubtaskManager.java | 62 ++++++++++++++++------
.../metric/schema/PipeSchemaRegionSinkMetrics.java | 12 ++++-
.../metric/sink/PipeDataRegionSinkMetrics.java | 15 ++++--
.../agent/task/PipeSinkSubtaskExecutorTest.java | 1 +
.../task/subtask/sink/PipeSinkSubtaskTest.java | 44 +++++++++++++++
.../agent/task/execution/PipeSubtaskExecutor.java | 10 ++--
.../task/subtask/PipeAbstractSinkSubtask.java | 4 +-
.../agent/task/subtask/PipeReportableSubtask.java | 16 +++---
.../pipe/agent/task/subtask/PipeSubtask.java | 6 ++-
14 files changed, 180 insertions(+), 51 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
index 54f4fe2c51f..f2985389d72 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -56,6 +56,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@@ -324,7 +325,7 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeDualTreeModel
generateMeasurementSchemas();
// Generate createTimeSeries in sender and receiver
- String uuid = "bcdedit";
+ String uuid = "bcdedit" + UUID.randomUUID().toString().replace("-", "");
for (Pair<MeasurementSchema, MeasurementSchema> pair : measurementSchemas)
{
createTimeSeries(uuid, pair.left.getMeasurementName(),
pair.left.getType().name(), senderEnv);
createTimeSeries(
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c13f87ae579..ac882f6ea4f 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeCo
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -433,13 +434,14 @@ public class PipeParameters {
}
static String hide(final String key, final String value) {
- if (Objects.isNull(key)) {
- return value;
- }
- if (KEYS.contains(KeyReducer.reduce(key))) {
+ if (isHiddenKey(key)) {
return PLACEHOLDER;
}
return value;
}
+
+ public static boolean isHiddenKey(final String key) {
+ return Objects.nonNull(key) &&
KEYS.contains(KeyReducer.reduce(key).toLowerCase(Locale.ROOT));
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index fe9737b0d7d..c37aab5af2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -260,7 +260,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
throw new PipeException(
String.format(
"Exception in pipe process, subtask: %s, last event: %s, root
cause: %s",
- taskID,
+ getDisplayTaskID(),
lastEvent instanceof EnrichedEvent
? ((EnrichedEvent) lastEvent).coreReportMessage()
: lastEvent,
@@ -300,7 +300,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
} catch (final Exception e) {
LOGGER.info(
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_PROCESSOR_SUBTASK,
- taskID,
+ getDisplayTaskID(),
ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 90d325f6d23..10b746778e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -60,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String attributeSortedString;
+ private final String attributeDisplayString;
private final int sinkIndex;
// Now parallel connectors run the same time, thus the heartbeat events are
not sure
@@ -75,8 +76,27 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask
{
final int sinkIndex,
final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
final PipeConnector outputPipeConnector) {
+ this(
+ taskID,
+ creationTime,
+ attributeSortedString,
+ attributeSortedString,
+ sinkIndex,
+ inputPendingQueue,
+ outputPipeConnector);
+ }
+
+ public PipeSinkSubtask(
+ final String taskID,
+ final long creationTime,
+ final String attributeSortedString,
+ final String attributeDisplayString,
+ final int sinkIndex,
+ final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
+ final PipeConnector outputPipeConnector) {
super(taskID, creationTime, outputPipeConnector);
this.attributeSortedString = attributeSortedString;
+ this.attributeDisplayString = attributeDisplayString;
this.sinkIndex = sinkIndex;
this.inputPendingQueue = inputPendingQueue;
@@ -156,7 +176,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
DataNodePipeMessages.PIPECONNECTOR
+ outputPipeSink.getClass().getName()
+ "(id: "
- + taskID
+ + getDisplayTaskID()
+ ")"
+ " heartbeat failed, or encountered failure when transferring
generic event. Failure: "
+ e.getMessage(),
@@ -181,13 +201,13 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
outputPipeSink.close();
LOGGER.info(
DataNodePipeMessages.PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS,
- taskID,
+ getDisplayTaskID(),
outputPipeSink,
System.currentTimeMillis() - startTime);
} catch (final Exception e) {
LOGGER.info(
DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_CONNECTOR_SUBTASK,
- taskID,
+ getDisplayTaskID(),
ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
@@ -366,4 +386,14 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
lastExceptionTime = Long.MAX_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
+
+ @Override
+ public String getDisplayTaskID() {
+ return generateDisplayTaskID(attributeDisplayString, creationTime,
sinkIndex);
+ }
+
+ static String generateDisplayTaskID(
+ final String attributeDisplayString, final long creationTime, final int
sinkIndex) {
+ return String.format("%s_%s_%s", attributeDisplayString, creationTime,
sinkIndex);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 42b1ae91366..b4933234659 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -75,7 +75,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
registeredTaskCount++;
LOGGER.info(
DataNodePipeMessages.REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
@@ -112,7 +112,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
registeredTaskCount--;
LOGGER.info(
DataNodePipeMessages.DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
@@ -135,7 +135,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
runningTaskCount++;
LOGGER.info(
DataNodePipeMessages.START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
@@ -152,7 +152,7 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
runningTaskCount--;
LOGGER.info(
DataNodePipeMessages.STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT,
- subtask,
+ subtask.getDisplayTaskID(),
runningTaskCount,
registeredTaskCount);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 3ad99ca5c06..564a29d51cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -64,6 +64,8 @@ public class PipeSinkSubtaskManager {
private final Map<String, List<PipeSinkSubtaskLifeCycle>>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
+ private final Map<String, String> attributeSortedString2DisplayString = new
HashMap<>();
+
public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
@@ -92,19 +94,14 @@ public class PipeSinkSubtaskManager {
final int sinkNum;
boolean realTimeFirst = false;
String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
+ final String attributeDisplayString =
generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
sinkNum =
pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
- PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
- pipeSinkParameters
- .getStringOrDefault(
- Arrays.asList(
- PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
-
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
- .toLowerCase())
+
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
? 1
:
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
realTimeFirst =
@@ -120,7 +117,9 @@ public class PipeSinkSubtaskManager {
sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
- environment.setAttributeSortedString(attributeSortedString);
+ final String attributeDisplayStringWithPrefix =
+ isDataRegionSink ? "data_" + attributeDisplayString : "schema_" +
attributeDisplayString;
+ environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
@@ -138,6 +137,11 @@ public class PipeSinkSubtaskManager {
}
for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
+ final String taskID =
+ String.format(
+ "%s_%s_%s",
+ attributeDisplayStringWithPrefix,
environment.getCreationTime(), sinkIndex);
+
final PipeConnector pipeSink =
isDataRegionSink
?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
@@ -168,10 +172,10 @@ public class PipeSinkSubtaskManager {
// 2. Construct PipeConnectorSubtaskLifeCycle to manage
PipeConnectorSubtask's life cycle
final PipeSinkSubtask pipeSinkSubtask =
new PipeSinkSubtask(
- String.format(
- "%s_%s_%s", attributeSortedString,
environment.getCreationTime(), sinkIndex),
+ taskID,
environment.getCreationTime(),
attributeSortedString,
+ attributeDisplayStringWithPrefix,
sinkIndex,
pendingQueue,
pipeSink);
@@ -182,11 +186,13 @@ public class PipeSinkSubtaskManager {
LOGGER.info(
DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED,
- attributeSortedString,
+ attributeDisplayStringWithPrefix,
executor.getWorkingThreadName(),
executor.getCallbackThreadName());
attributeSortedString2SubtaskLifeCycleMap.put(
attributeSortedString, pipeSinkSubtaskLifeCycleList);
+ attributeSortedString2DisplayString.put(
+ attributeSortedString, attributeDisplayStringWithPrefix);
}
for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -203,7 +209,7 @@ public class PipeSinkSubtaskManager {
final int regionId,
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
+ throwNoSuchSubtaskException(attributeSortedString);
}
final List<PipeSinkSubtaskLifeCycle> lifeCycles =
@@ -219,6 +225,7 @@ public class PipeSinkSubtaskManager {
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
+ attributeSortedString2DisplayString.remove(attributeSortedString);
executor.shutdown();
LOGGER.info(
DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN,
@@ -234,7 +241,7 @@ public class PipeSinkSubtaskManager {
public synchronized void start(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
+ throwNoSuchSubtaskException(attributeSortedString);
}
for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -245,7 +252,7 @@ public class PipeSinkSubtaskManager {
public synchronized void stop(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
+ throwNoSuchSubtaskException(attributeSortedString);
}
for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -258,7 +265,8 @@ public class PipeSinkSubtaskManager {
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
- DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK +
attributeSortedString);
+ DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK
+ + getDisplayStringForException(attributeSortedString));
}
// All subtasks share the same pending queue
@@ -268,13 +276,35 @@ public class PipeSinkSubtaskManager {
.getPendingQueue();
}
- private String generateAttributeSortedString(final PipeParameters
pipeConnectorParameters) {
+ private static String generateAttributeSortedString(
+ final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return sortedStringSourceMap.toString();
}
+ /**
+ * Attribute string for logs, metrics and exception messages with sensitive
attributes removed.
+ */
+ static String generateAttributeDisplayString(final PipeParameters
pipeConnectorParameters) {
+ final TreeMap<String, String> filteredAttributes =
+ new TreeMap<>(pipeConnectorParameters.getAttribute());
+ filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
+
filteredAttributes.keySet().removeIf(PipeParameters.ValueHider::isHiddenKey);
+ return filteredAttributes.toString();
+ }
+
+ private void throwNoSuchSubtaskException(final String attributeSortedString)
{
+ throw new PipeException(
+ FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE
+ + getDisplayStringForException(attributeSortedString));
+ }
+
+ private String getDisplayStringForException(final String
attributeSortedString) {
+ return
attributeSortedString2DisplayString.getOrDefault(attributeSortedString,
"unknown");
+ }
+
///////////////////////// Singleton Instance Holder
/////////////////////////
private PipeSinkSubtaskManager() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
index d9b79fdda8e..17ff545251c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
@@ -110,7 +110,9 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
public void deregister(final String taskID) {
if (!connectorMap.containsKey(taskID)) {
-
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
taskID);
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR,
+ getDisplayTaskID(taskID));
return;
}
if (Objects.nonNull(metricService)) {
@@ -125,12 +127,18 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
}
final Rate rate = schemaRateMap.get(taskID);
if (rate == null) {
-
LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE,
taskID);
+ LOGGER.info(
+ DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE,
getDisplayTaskID(taskID));
return;
}
rate.mark();
}
+ private String getDisplayTaskID(final String taskID) {
+ final PipeSinkSubtask connector = connectorMap.get(taskID);
+ return Objects.nonNull(connector) ? connector.getDisplayTaskID() :
"unknown";
+ }
+
//////////////////////////// singleton ////////////////////////////
private static class PipeSchemaRegionSinkMetricsHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index dd7707d1b96..003f6f45ef4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -447,7 +447,9 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
public void deregister(final String taskID) {
if (!sinkMap.containsKey(taskID)) {
-
LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK,
taskID);
+ LOGGER.warn(
+ DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK,
+ getDisplayTaskID(taskID));
return;
}
if (Objects.nonNull(metricService)) {
@@ -462,7 +464,8 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
}
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
- LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK,
taskID);
+ LOGGER.info(
+ DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK,
getDisplayTaskID(taskID));
return;
}
rate.mark();
@@ -474,12 +477,18 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
}
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
- LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1,
taskID);
+ LOGGER.info(
+ DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1,
getDisplayTaskID(taskID));
return;
}
rate.mark();
}
+ private String getDisplayTaskID(final String taskID) {
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
+ return Objects.nonNull(sink) ? sink.getDisplayTaskID() : "unknown";
+ }
+
public void markPipeHeartbeatEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
index a237008ab4e..9cd54b425cf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java
@@ -41,6 +41,7 @@ public class PipeSinkSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
"PipeConnectorSubtaskExecutorTest",
System.currentTimeMillis(),
"TestAttributeSortedString",
+ "TestAttributeSortedString",
0,
mock(UnboundedBlockingPendingQueue.class),
mock(PipeConnector.class)));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index 2a15fb9ea18..e9d76a4fc7e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -23,12 +23,18 @@ import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
public class PipeSinkSubtaskTest {
@@ -47,6 +53,7 @@ public class PipeSinkSubtaskTest {
"PipeSinkSubtaskTest",
System.currentTimeMillis(),
"data_test",
+ "data_test",
0,
(UnboundedBlockingPendingQueue) pendingQueue,
connector));
@@ -60,4 +67,41 @@ public class PipeSinkSubtaskTest {
subtask.close();
}
}
+
+ @Test
+ public void testTransferExceptionUsesDisplayTaskID() throws Exception {
+ final PipeConnector connector = mock(PipeConnector.class);
+ final UnboundedBlockingPendingQueue<Event> pendingQueue =
+ mock(UnboundedBlockingPendingQueue.class);
+ final Event event = mock(Event.class);
+
+ when(pendingQueue.waitedPoll()).thenReturn(event);
+ doThrow(new RuntimeException("No more authentication methods available"))
+ .when(connector)
+ .transfer(any(Event.class));
+
+ final PipeSinkSubtask subtask =
+ new PipeSinkSubtask(
+ "data_{sink=TSFILE_REMOTE_SINK,
sink.scp.password=Iotdb@2026}_1701687309493_0",
+ 1701687309493L,
+ "data_{sink=TSFILE_REMOTE_SINK, sink.scp.password=Iotdb@2026}",
+ "data_{sink=TSFILE_REMOTE_SINK, sink.scp.host=172.20.70.119}",
+ 0,
+ pendingQueue,
+ connector);
+
+ try {
+ subtask.executeOnce();
+ Assert.fail();
+ } catch (final PipeException e) {
+ Assert.assertTrue(e.getMessage().contains("Exception in pipe transfer,
subtask: data_{"));
+ Assert.assertTrue(e.getMessage().contains("sink=TSFILE_REMOTE_SINK"));
+
Assert.assertTrue(e.getMessage().contains("sink.scp.host=172.20.70.119"));
+ Assert.assertTrue(e.getMessage().contains("No more authentication
methods available"));
+ Assert.assertFalse(e.getMessage().contains("sink.scp.password"));
+ Assert.assertFalse(e.getMessage().contains("Iotdb@2026"));
+ } finally {
+ subtask.close();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index c28cfb25692..f14bb483e53 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -99,7 +99,7 @@ public abstract class PipeSubtaskExecutor {
public final synchronized void register(final PipeSubtask subtask) {
if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
- LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED,
getSafeSubtaskStr(subtask.getTaskID()));
+ LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED,
subtask.getDisplayTaskID());
return;
}
@@ -125,13 +125,13 @@ public abstract class PipeSubtaskExecutor {
final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
if (subtask.isSubmittingSelf()) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING,
getSafeSubtaskStr(subTaskID));
+ LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING,
subtask.getDisplayTaskID());
}
} else {
subtask.allowSubmittingSelf();
subtask.submitSelf();
++runningSubtaskNumber;
- LOGGER.info(PipeMessages.SUBTASK_STARTED, getSafeSubtaskStr(subTaskID));
+ LOGGER.info(PipeMessages.SUBTASK_STARTED, subtask.getDisplayTaskID());
}
}
@@ -154,9 +154,9 @@ public abstract class PipeSubtaskExecutor {
if (subtask != null) {
try {
subtask.close();
- LOGGER.info(PipeMessages.SUBTASK_CLOSED, getSafeSubtaskStr(subTaskID));
+ LOGGER.info(PipeMessages.SUBTASK_CLOSED, subtask.getDisplayTaskID());
} catch (final Exception e) {
- LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED,
getSafeSubtaskStr(subTaskID), e);
+ LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED,
subtask.getDisplayTaskID(), e);
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 790e5fe4c6d..9dbf5af2d0c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -199,7 +199,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
PipeMessages.HANDSHAKE_FAILED_STOPPING,
outputPipeSink.getClass().getName(),
MAX_RETRY_TIMES,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable);
@@ -287,7 +287,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
throw new PipeException(
String.format(
PipeMessages.EXCEPTION_IN_PIPE_TRANSFER_FORMAT,
- taskID,
+ getDisplayTaskID(),
event instanceof EnrichedEvent
? ((EnrichedEvent) event).coreReportMessage()
: event,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 1f43403157b..52bc3bd61f8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -88,7 +88,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
if (retryCount.get() == 0) {
LOGGER.warn(
PipeMessages.FAILED_TO_EXECUTE_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
@@ -102,7 +102,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
LOGGER::warn,
throwable,
PipeMessages.RETRY_EXECUTING_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
@@ -113,7 +113,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
} catch (final InterruptedException e) {
LOGGER.warn(
PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
e);
@@ -125,7 +125,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
final String errorMessage =
String.format(
PipeMessages.SUBTASK_RETRY_EXCEEDED_FORMAT,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
retryCount.get() - 1,
@@ -139,7 +139,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
: new PipeRuntimeCriticalException(errorMessage));
LOGGER.warn(
PipeMessages.SUBTASK_EXCEPTION_REPORTED,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable);
@@ -154,7 +154,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
if (retryCount.get() == 0) {
LOGGER.warn(
PipeMessages.FAILED_TO_EXECUTE_SUBTASK_RETRY_FOREVER,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
@@ -165,7 +165,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
PipeLogger.log(
LOGGER::warn,
PipeMessages.RETRY_EXECUTING_SUBTASK_FOREVER,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
@@ -176,7 +176,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
} catch (final InterruptedException e) {
LOGGER.warn(
PipeMessages.INTERRUPTED_RETRYING_SUBTASK,
- taskID,
+ getDisplayTaskID(),
creationTime,
this.getClass().getSimpleName());
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 2da797c2b3b..29f651a1ac9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -114,7 +114,7 @@ public abstract class PipeSubtask
if (totalRetryCount != 0) {
LOGGER.warn(
"Successfully executed subtask {}({}) after {} retries.",
- taskID,
+ getDisplayTaskID(),
this.getClass().getSimpleName(),
totalRetryCount);
}
@@ -196,6 +196,10 @@ public abstract class PipeSubtask
return taskID;
}
+ public String getDisplayTaskID() {
+ return taskID;
+ }
+
public long getCreationTime() {
return creationTime;
}