This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba29144921 Pipe: Mask sensitive attributes in sink subtask display
strings (#17737)
2ba29144921 is described below
commit 2ba291449219c500ab05a2f61bf2a7022713c5e5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 28 18:39:31 2026 +0800
Pipe: Mask sensitive attributes in sink subtask display strings (#17737)
* Pipe: Mask sensitive attributes in sink subtask display strings
Use masked PipeParameters display string for logs, metrics and subtask
names while keeping unmasked sorted string for internal lifecycle map keys.
Also treat scp.password as a sensitive parameter.
Co-authored-by: Cursor <[email protected]>
* Pipe: Fix sink compression timer keying and masked error paths
Key compressionTimerMap by per-subtask taskID instead of masked attribute
string to avoid collisions when only sensitive fields differ. Use masked
display strings in subtask-not-found exceptions and pass sinkTaskId from
runtime environment to IoTDB sinks for timer lookup.
Co-authored-by: Cursor <[email protected]>
---------
Co-authored-by: Cursor <[email protected]>
---
.../api/customizer/parameter/PipeParameters.java | 1 +
.../task/subtask/sink/PipeSinkSubtaskManager.java | 54 ++++++++++++++++++----
.../metric/sink/PipeDataRegionSinkMetrics.java | 10 ++--
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 5 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 5 +-
.../thrift/sync/IoTDBDataRegionSyncSink.java | 5 +-
.../plugin/env/PipeTaskSinkRuntimeEnvironment.java | 9 ++++
.../commons/pipe/sink/protocol/IoTDBSink.java | 7 ++-
8 files changed, 70 insertions(+), 26 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c13f87ae579..f9ef2a64a83 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -429,6 +429,7 @@ public class PipeParameters {
static {
KEYS.add("ssl.trust-store-pwd");
+ KEYS.add("scp.password");
KEYS.add("password");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 3ad99ca5c06..01552eec5ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -64,6 +64,8 @@ public class PipeSinkSubtaskManager {
private final Map<String, List<PipeSinkSubtaskLifeCycle>>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
+ private final Map<String, String> attributeSortedString2DisplayString = new
HashMap<>();
+
public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
@@ -92,6 +94,7 @@ public class PipeSinkSubtaskManager {
final int sinkNum;
boolean realTimeFirst = false;
String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
+ final String attributeDisplayString =
generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
sinkNum =
pipeSinkParameters.getIntOrDefault(
@@ -120,7 +123,9 @@ public class PipeSinkSubtaskManager {
sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
- environment.setAttributeSortedString(attributeSortedString);
+ final String attributeDisplayStringWithPrefix =
+ isDataRegionSink ? "data_" + attributeDisplayString : "schema_" +
attributeDisplayString;
+ environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
@@ -138,6 +143,12 @@ public class PipeSinkSubtaskManager {
}
for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
+ final String taskID =
+ String.format(
+ "%s_%s_%s",
+ attributeDisplayStringWithPrefix,
environment.getCreationTime(), sinkIndex);
+ environment.setSinkTaskId(taskID);
+
final PipeConnector pipeSink =
isDataRegionSink
?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
@@ -168,10 +179,9 @@ public class PipeSinkSubtaskManager {
// 2. Construct PipeConnectorSubtaskLifeCycle to manage
PipeConnectorSubtask's life cycle
final PipeSinkSubtask pipeSinkSubtask =
new PipeSinkSubtask(
- String.format(
- "%s_%s_%s", attributeSortedString,
environment.getCreationTime(), sinkIndex),
+ taskID,
environment.getCreationTime(),
- attributeSortedString,
+ attributeDisplayStringWithPrefix,
sinkIndex,
pendingQueue,
pipeSink);
@@ -182,11 +192,13 @@ public class PipeSinkSubtaskManager {
LOGGER.info(
DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED,
- attributeSortedString,
+ attributeDisplayStringWithPrefix,
executor.getWorkingThreadName(),
executor.getCallbackThreadName());
attributeSortedString2SubtaskLifeCycleMap.put(
attributeSortedString, pipeSinkSubtaskLifeCycleList);
+ attributeSortedString2DisplayString.put(
+ attributeSortedString, attributeDisplayStringWithPrefix);
}
for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -203,7 +215,7 @@ public class PipeSinkSubtaskManager {
final int regionId,
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
+ throwNoSuchSubtaskException(attributeSortedString);
}
final List<PipeSinkSubtaskLifeCycle> lifeCycles =
@@ -219,6 +231,7 @@ public class PipeSinkSubtaskManager {
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
+ attributeSortedString2DisplayString.remove(attributeSortedString);
executor.shutdown();
LOGGER.info(
DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN,
@@ -234,7 +247,7 @@ public class PipeSinkSubtaskManager {
public synchronized void start(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
+ throwNoSuchSubtaskException(attributeSortedString);
}
for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -245,7 +258,7 @@ public class PipeSinkSubtaskManager {
public synchronized void stop(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
+ throwNoSuchSubtaskException(attributeSortedString);
}
for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -258,7 +271,8 @@ public class PipeSinkSubtaskManager {
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
- DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK +
attributeSortedString);
+ DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK
+ + getDisplayStringForException(attributeSortedString));
}
// All subtasks share the same pending queue
@@ -268,13 +282,33 @@ public class PipeSinkSubtaskManager {
.getPendingQueue();
}
- private String generateAttributeSortedString(final PipeParameters
pipeConnectorParameters) {
+ private static String generateAttributeSortedString(
+ final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return sortedStringSourceMap.toString();
}
+ /** Masked attribute string for logs, metrics and exception messages. */
+ private static String generateAttributeDisplayString(
+ final PipeParameters pipeConnectorParameters) {
+ final TreeMap<String, String> filteredAttributes =
+ new TreeMap<>(pipeConnectorParameters.getAttribute());
+ filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
+ return new PipeParameters(filteredAttributes).toString();
+ }
+
+ private void throwNoSuchSubtaskException(final String attributeSortedString)
{
+ throw new PipeException(
+ FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE
+ + getDisplayStringForException(attributeSortedString));
+ }
+
+ private String getDisplayStringForException(final String
attributeSortedString) {
+ return
attributeSortedString2DisplayString.getOrDefault(attributeSortedString,
"unknown");
+ }
+
///////////////////////// Singleton Instance Holder
/////////////////////////
private PipeSinkSubtaskManager() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index dd7707d1b96..9b2f876ff2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -199,8 +199,8 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
private void createTimer(final String taskID) {
final PipeSinkSubtask sink = sinkMap.get(taskID);
- compressionTimerMap.putIfAbsent(
- sink.getAttributeSortedString(),
+ compressionTimerMap.put(
+ taskID,
metricService.getOrCreateTimer(
Metric.PIPE_COMPRESSION_TIME.toString(),
MetricLevel.IMPORTANT,
@@ -394,7 +394,7 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(sink.getCreationTime()));
- compressionTimerMap.remove(sink.getAttributeSortedString());
+ compressionTimerMap.remove(taskID);
}
private void removeHistogram(final String taskID) {
@@ -492,8 +492,8 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
rate.mark();
}
- public Timer getCompressionTimer(final String attributeSortedString) {
- return Objects.isNull(metricService) ? null :
compressionTimerMap.get(attributeSortedString);
+ public Timer getCompressionTimer(final String taskID) {
+ return Objects.isNull(metricService) ? null :
compressionTimerMap.get(taskID);
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index ea83524988b..4f2dab1bfa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -604,9 +604,8 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
@Override
protected byte[] compressIfNeeded(final byte[] reqInBytes) throws
IOException {
- if (Objects.isNull(compressionTimer) &&
Objects.nonNull(attributeSortedString)) {
- compressionTimer =
-
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
+ if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
+ compressionTimer =
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
}
return super.compressIfNeeded(reqInBytes);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index c607490b362..bd3f06ba778 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -508,9 +508,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws
IOException {
- if (Objects.isNull(compressionTimer) &&
Objects.nonNull(attributeSortedString)) {
- compressionTimer =
-
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
+ if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
+ compressionTimer =
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
}
return super.compressIfNeeded(req);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index d9e25f5e09f..eb7d39864c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -595,9 +595,8 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws
IOException {
- if (Objects.isNull(compressionTimer) &&
Objects.nonNull(attributeSortedString)) {
- compressionTimer =
-
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
+ if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
+ compressionTimer =
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
}
return super.compressIfNeeded(req);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
index b8382891348..26081d9c78a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config.plugin.env;
public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment
{
private String attributeSortedString;
+ private String sinkTaskId;
public PipeTaskSinkRuntimeEnvironment(
final String pipeName, final long creationTime, final int regionId) {
@@ -34,4 +35,12 @@ public class PipeTaskSinkRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
public void setAttributeSortedString(String attributeSortedString) {
this.attributeSortedString = attributeSortedString;
}
+
+ public String getSinkTaskId() {
+ return sinkTaskId;
+ }
+
+ public void setSinkTaskId(final String sinkTaskId) {
+ this.sinkTaskId = sinkTaskId;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index a52779650f8..b5662aeec2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -189,6 +189,7 @@ public abstract class IoTDBSink implements PipeConnector,
PipeConnectorWithEvent
private final AtomicLong totalUncompressedSize = new AtomicLong(0);
private final AtomicLong totalCompressedSize = new AtomicLong(0);
protected String attributeSortedString;
+ protected String sinkTaskId;
protected Timer compressionTimer;
protected boolean isRealtimeFirst;
@@ -391,8 +392,10 @@ public abstract class IoTDBSink implements PipeConnector,
PipeConnectorWithEvent
throws Exception {
final PipeRuntimeEnvironment environment =
configuration.getRuntimeEnvironment();
if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
- attributeSortedString =
- ((PipeTaskSinkRuntimeEnvironment)
environment).getAttributeSortedString();
+ final PipeTaskSinkRuntimeEnvironment sinkEnvironment =
+ (PipeTaskSinkRuntimeEnvironment) environment;
+ attributeSortedString = sinkEnvironment.getAttributeSortedString();
+ sinkTaskId = sinkEnvironment.getSinkTaskId();
}
nodeUrls.clear();