This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch try-separate-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/try-separate-lock by this push:
new 250c591eda3 y
250c591eda3 is described below
commit 250c591eda3bd317926c633e5da2032d845af23f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 29 15:25:40 2026 +0800
y
---
.../agent/plugin/PipeConfigNodePluginAgent.java | 2 +-
.../dataregion/PipeDataRegionPluginAgent.java | 4 ++--
.../schemaregion/PipeSchemaRegionPluginAgent.java | 4 ++--
.../commons/pipe/agent/plugin/PipePluginAgent.java | 26 +++++++++++-----------
4 files changed, 18 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index f1441c4c6e4..44e5675402c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -44,7 +44,7 @@ public class PipeConfigNodePluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSinkConstructor createPipeConnectorConstructor(
+ protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionSinkConstructor();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index ec6d4f2fa98..a8d002fb27c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -42,7 +42,7 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSourceConstructor createPipeExtractorConstructor(
+ protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeDataRegionSourceConstructor((DataNodePipePluginMetaKeeper)
pipePluginMetaKeeper);
}
@@ -55,7 +55,7 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSinkConstructor createPipeConnectorConstructor(
+ protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeDataRegionSinkConstructor((DataNodePipePluginMetaKeeper)
pipePluginMetaKeeper);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
index 549030073b1..cd4293c6ac0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
@@ -32,7 +32,7 @@ public class PipeSchemaRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSourceConstructor createPipeExtractorConstructor(
+ protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionSourceConstructor();
}
@@ -44,7 +44,7 @@ public class PipeSchemaRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSinkConstructor createPipeConnectorConstructor(
+ protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionSinkConstructor();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 1649660425f..519d263449b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -55,18 +55,18 @@ public abstract class PipePluginAgent {
protected PipePluginAgent(final PipePluginMetaKeeper pipePluginMetaKeeper) {
this.pipePluginMetaKeeper = pipePluginMetaKeeper;
- pipeExtractorConstructor =
createPipeExtractorConstructor(pipePluginMetaKeeper);
+ pipeExtractorConstructor =
createPipeSourceConstructor(pipePluginMetaKeeper);
pipeProcessorConstructor =
createPipeProcessorConstructor(pipePluginMetaKeeper);
- pipeSinkConstructor = createPipeConnectorConstructor(pipePluginMetaKeeper);
+ pipeSinkConstructor = createPipeSinkConstructor(pipePluginMetaKeeper);
}
- protected abstract PipeSourceConstructor createPipeExtractorConstructor(
+ protected abstract PipeSourceConstructor createPipeSourceConstructor(
final PipePluginMetaKeeper pipePluginMetaKeeper);
protected abstract PipeProcessorConstructor createPipeProcessorConstructor(
final PipePluginMetaKeeper pipePluginMetaKeeper);
- protected abstract PipeSinkConstructor createPipeConnectorConstructor(
+ protected abstract PipeSinkConstructor createPipeSinkConstructor(
final PipePluginMetaKeeper pipePluginMetaKeeper);
public final PipeExtractor reflectSource(final PipeParameters
sourceParameters) {
@@ -124,24 +124,24 @@ public abstract class PipePluginAgent {
return temporaryProcessor;
}
- protected PipeConnector validateSink(String pipeName, Map<String, String>
connectorAttributes)
+ protected PipeConnector validateSink(String pipeName, Map<String, String>
sinkAttributes)
throws Exception {
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
- final PipeConnector temporaryConnector = reflectSink(connectorParameters);
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+ final PipeConnector temporarySink = reflectSink(sinkParameters);
try {
- temporaryConnector.validate(new
PipeParameterValidator(connectorParameters));
- temporaryConnector.customize(
- connectorParameters,
+ temporarySink.validate(new PipeParameterValidator(sinkParameters));
+ temporarySink.customize(
+ sinkParameters,
new PipeTaskRuntimeConfiguration(new
PipeTaskTemporaryRuntimeEnvironment(pipeName)));
- temporaryConnector.handshake();
+ temporarySink.handshake();
} finally {
try {
- temporaryConnector.close();
+ temporarySink.close();
} catch (Exception e) {
LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(),
e);
}
}
- return temporaryConnector;
+ return temporarySink;
}
/**