This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch try-separate-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/try-separate-lock by this push:
new 090aefece97 ams
090aefece97 is described below
commit 090aefece971a88a7e6d099c4167a78feea46bad
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 29 15:24:04 2026 +0800
ams
---
.../agent/plugin/PipeConfigNodePluginAgent.java | 2 +-
.../pipe/sink/protocol/IoTDBConfigRegionSink.java | 4 +-
.../dataregion/PipeDataRegionPluginAgent.java | 14 +++----
.../task/builder/PipeDataNodeTaskBuilder.java | 2 +-
.../pipe/agent/task/stage/PipeTaskSinkStage.java | 12 +++---
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 6 +--
.../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +-
.../PipeDataNodeRemainingEventAndTimeOperator.java | 10 ++---
.../task/stage/SubscriptionTaskSinkStage.java | 16 ++++----
.../subtask/SubscriptionSinkSubtaskManager.java | 37 +++++++++---------
.../commons/pipe/agent/plugin/PipePluginAgent.java | 44 +++++++++++-----------
11 files changed, 72 insertions(+), 77 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index 8ddbb73398c..f1441c4c6e4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -32,7 +32,7 @@ public class PipeConfigNodePluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSourceConstructor createPipeExtractorConstructor(
+ protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionSourceConstructor();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 5ce03f02598..36bd1cb1a5e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -192,9 +192,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink
{
true);
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Successfully transferred config event {}.",
pipeConfigRegionWritePlanEvent);
- }
+ LOGGER.info("Successfully transferred config event {}.",
pipeConfigRegionWritePlanEvent);
}
private void doTransferWrapper(final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index ae27a7eff33..ec6d4f2fa98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -63,18 +63,18 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
@Override
public void validate(
String pipeName,
- Map<String, String> extractorAttributes,
+ Map<String, String> sourceAttributes,
Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes)
+ Map<String, String> sinkAttributes)
throws Exception {
- PipeExtractor temporaryExtractor = validateExtractor(extractorAttributes);
+ PipeExtractor temporaryExtractor = validateSource(sourceAttributes);
PipeProcessor temporaryProcessor = validateProcessor(processorAttributes);
- PipeConnector temporaryConnector = validateConnector(pipeName,
connectorAttributes);
+ PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes);
// validate visibility
// TODO: validate visibility for schema region and config region
Visibility pipeVisibility =
- VisibilityUtils.calculateFromExtractorParameters(new
PipeParameters(extractorAttributes));
+ VisibilityUtils.calculateFromExtractorParameters(new
PipeParameters(sourceAttributes));
Visibility extractorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
Visibility processorVisibility =
@@ -88,13 +88,13 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
"The visibility of the pipe (%s, %s) is not compatible with the
visibility of the extractor (%s, %s, %s), processor (%s, %s, %s), and connector
(%s, %s, %s).",
pipeName,
pipeVisibility,
- extractorAttributes,
+ sourceAttributes,
temporaryExtractor.getClass().getName(),
extractorVisibility,
processorAttributes,
temporaryProcessor.getClass().getName(),
processorVisibility,
- connectorAttributes,
+ sinkAttributes,
temporaryConnector.getClass().getName(),
connectorVisibility));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 038420527fd..f9609869b45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -127,7 +127,7 @@ public class PipeDataNodeTaskBuilder {
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
regionId,
sourceStage.getEventSupplier(),
- sinkStage.getPipeConnectorPendingQueue(),
+ sinkStage.getPipeSinkPendingQueue(),
PROCESSOR_EXECUTOR,
pipeTaskMeta,
pipeStaticMeta
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index c24db53e610..a22fbb536d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -34,7 +34,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
protected final String pipeName;
protected final long creationTime;
- protected final PipeParameters pipeConnectorParameters;
+ protected final PipeParameters pipeSinkParameters;
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
@@ -43,12 +43,12 @@ public class PipeTaskSinkStage extends PipeTaskStage {
public PipeTaskSinkStage(
String pipeName,
long creationTime,
- PipeParameters pipeConnectorParameters,
+ PipeParameters pipeSinkParameters,
int regionId,
Supplier<? extends PipeSinkSubtaskExecutor> executor) {
this.pipeName = pipeName;
this.creationTime = creationTime;
- this.pipeConnectorParameters = pipeConnectorParameters;
+ this.pipeSinkParameters = pipeSinkParameters;
this.regionId = regionId;
this.executor = executor;
@@ -60,7 +60,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
PipeSinkSubtaskManager.instance()
.register(
executor,
- pipeConnectorParameters,
+ pipeSinkParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime,
regionId));
}
@@ -85,7 +85,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
- public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
- return
PipeSinkSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
+ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
+ return
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 0df3a773b9c..35f7983075d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -39,9 +39,9 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
protected int registeredTaskCount;
public PipeSinkSubtaskLifeCycle(
- PipeSinkSubtaskExecutor executor,
- PipeSinkSubtask subtask,
- UnboundedBlockingPendingQueue<Event> pendingQueue) {
+ final PipeSinkSubtaskExecutor executor,
+ final PipeSinkSubtask subtask,
+ final UnboundedBlockingPendingQueue<Event> pendingQueue) {
this.executor = executor;
this.subtask = subtask;
this.pendingQueue = pendingQueue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index b249ed7b1e9..3a461fe0395 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -243,7 +243,7 @@ public class PipeSinkSubtaskManager {
}
}
- public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index f54e8bcfccb..38cfb579622 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
// Calculate from schema region extractors directly for it requires less
computation
- private final Set<IoTDBSchemaRegionSource> schemaRegionExtractors =
+ private final Set<IoTDBSchemaRegionSource> schemaRegionSources =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
@@ -105,7 +105,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
tsfileEventCount.get()
+ rawTabletEventCount.get()
+ insertNodeEventCount.get()
- + schemaRegionExtractors.stream()
+ + schemaRegionSources.stream()
.map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
@@ -157,7 +157,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
}
final long totalSchemaRegionWriteEventCount =
- schemaRegionExtractors.stream()
+ schemaRegionSources.stream()
.map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
@@ -192,8 +192,8 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
//////////////////////////// Register & deregister (pipe integration)
////////////////////////////
- void register(final IoTDBSchemaRegionSource extractor) {
- schemaRegionExtractors.add(extractor);
+ void register(final IoTDBSchemaRegionSource source) {
+ schemaRegionSources.add(source);
}
//////////////////////////// Rate ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 233164b4d1d..73fca57a1fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -31,12 +31,12 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
public class SubscriptionTaskSinkStage extends PipeTaskSinkStage {
public SubscriptionTaskSinkStage(
- String pipeName,
- long creationTime,
- PipeParameters pipeConnectorParameters,
- int regionId,
- PipeSinkSubtaskExecutor executor) {
- super(pipeName, creationTime, pipeConnectorParameters, regionId, () ->
executor);
+ final String pipeName,
+ final long creationTime,
+ final PipeParameters pipeSinkParameters,
+ final int regionId,
+ final PipeSinkSubtaskExecutor executor) {
+ super(pipeName, creationTime, pipeSinkParameters, regionId, () ->
executor);
}
@Override
@@ -45,7 +45,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
SubscriptionSinkSubtaskManager.instance()
.register(
executor.get(),
- pipeConnectorParameters,
+ pipeSinkParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime,
regionId));
}
@@ -70,7 +70,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
- public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
return SubscriptionSinkSubtaskManager.instance()
.getPipeConnectorPendingQueue(connectorSubtaskId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index edff8257c65..07def3ff4d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -64,10 +64,10 @@ public class SubscriptionSinkSubtaskManager {
public synchronized String register(
final PipeSinkSubtaskExecutor executor,
- final PipeParameters pipeConnectorParameters,
+ final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorKey =
- pipeConnectorParameters
+ pipeSinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -86,13 +86,13 @@ public class SubscriptionSinkSubtaskManager {
environment.getRegionId(),
connectorKey);
- boolean realTimeFirst =
- pipeConnectorParameters.getBooleanOrDefault(
+ final boolean realTimeFirst =
+ pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
- String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
+ String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
attributeSortedString = "__subscription_" + attributeSortedString;
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
@@ -101,36 +101,33 @@ public class SubscriptionSinkSubtaskManager {
? new PipeRealtimePriorityBlockingQueue()
: new UnboundedBlockingPendingQueue<>(new
PipeDataRegionEventCounter());
- final PipeConnector pipeConnector =
-
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters);
+ final PipeConnector pipeSink =
+
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters);
// 1. Construct, validate and customize PipeConnector, and then
handshake (create connection)
// with the target
try {
- pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
- pipeConnector.customize(
- pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
- pipeConnector.handshake();
+ pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
+ pipeSink.customize(pipeSinkParameters, new
PipeTaskRuntimeConfiguration(environment));
+ pipeSink.handshake();
} catch (final Exception e) {
try {
- pipeConnector.close();
+ pipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
- "Failed to close connector after failed to initialize connector.
"
- + "Ignore this exception.",
+ "Failed to close sink after failed to initialize sink. " +
"Ignore this exception.",
closeException);
}
- throw new PipeException(
- "Failed to construct PipeConnector, because of " + e.getMessage(),
e);
+ throw new PipeException("Failed to construct PipeSink, because of " +
e.getMessage(), e);
}
// 2. Fetch topic and consumer group id from connector parameters
- final String topicName =
pipeConnectorParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
+ final String topicName =
pipeSinkParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
final String consumerGroupId =
-
pipeConnectorParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
+
pipeSinkParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
if (Objects.isNull(topicName) || Objects.isNull(consumerGroupId)) {
throw new SubscriptionException(
String.format(
- "Failed to construct subscription connector, because of %s or
%s does not exist in pipe connector parameters",
+ "Failed to construct subscription sink, because of %s or %s
does not exist in pipe connector parameters",
PipeSinkConstant.SINK_TOPIC_KEY,
PipeSinkConstant.SINK_CONSUMER_GROUP_KEY));
}
@@ -142,7 +139,7 @@ public class SubscriptionSinkSubtaskManager {
attributeSortedString,
0,
pendingQueue,
- pipeConnector,
+ pipeSink,
topicName,
consumerGroupId);
final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 33acd4beb63..1649660425f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -53,7 +53,7 @@ public abstract class PipePluginAgent {
private final PipeProcessorConstructor pipeProcessorConstructor;
private final PipeSinkConstructor pipeSinkConstructor;
- protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+ protected PipePluginAgent(final PipePluginMetaKeeper pipePluginMetaKeeper) {
this.pipePluginMetaKeeper = pipePluginMetaKeeper;
pipeExtractorConstructor =
createPipeExtractorConstructor(pipePluginMetaKeeper);
pipeProcessorConstructor =
createPipeProcessorConstructor(pipePluginMetaKeeper);
@@ -61,48 +61,48 @@ public abstract class PipePluginAgent {
}
protected abstract PipeSourceConstructor createPipeExtractorConstructor(
- PipePluginMetaKeeper pipePluginMetaKeeper);
+ final PipePluginMetaKeeper pipePluginMetaKeeper);
protected abstract PipeProcessorConstructor createPipeProcessorConstructor(
- PipePluginMetaKeeper pipePluginMetaKeeper);
+ final PipePluginMetaKeeper pipePluginMetaKeeper);
protected abstract PipeSinkConstructor createPipeConnectorConstructor(
- PipePluginMetaKeeper pipePluginMetaKeeper);
+ final PipePluginMetaKeeper pipePluginMetaKeeper);
- public final PipeExtractor reflectSource(PipeParameters extractorParameters)
{
- return pipeExtractorConstructor.reflectPlugin(extractorParameters);
+ public final PipeExtractor reflectSource(final PipeParameters
sourceParameters) {
+ return pipeExtractorConstructor.reflectPlugin(sourceParameters);
}
- public final PipeProcessor reflectProcessor(PipeParameters
processorParameters) {
+ public final PipeProcessor reflectProcessor(final PipeParameters
processorParameters) {
return pipeProcessorConstructor.reflectPlugin(processorParameters);
}
- public final PipeConnector reflectSink(PipeParameters connectorParameters) {
- return pipeSinkConstructor.reflectPlugin(connectorParameters);
+ public final PipeConnector reflectSink(final PipeParameters sinkParameters) {
+ return pipeSinkConstructor.reflectPlugin(sinkParameters);
}
public void validate(
- String pipeName,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes)
+ final String pipeName,
+ final Map<String, String> sourceAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> sinkAttributes)
throws Exception {
- validateExtractor(extractorAttributes);
+ validateSource(sourceAttributes);
validateProcessor(processorAttributes);
- validateConnector(pipeName, connectorAttributes);
+ validateSink(pipeName, sinkAttributes);
}
- protected PipeExtractor validateExtractor(Map<String, String>
extractorAttributes)
+ protected PipeExtractor validateSource(final Map<String, String>
sourceAttributes)
throws Exception {
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
- final PipeExtractor temporaryExtractor =
reflectSource(extractorParameters);
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
+ final PipeExtractor temporaryExtractor = reflectSource(sourceParameters);
try {
- temporaryExtractor.validate(new
PipeParameterValidator(extractorParameters));
+ temporaryExtractor.validate(new
PipeParameterValidator(sourceParameters));
} finally {
try {
temporaryExtractor.close();
} catch (Exception e) {
- LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(),
e);
+ LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e);
}
}
return temporaryExtractor;
@@ -124,8 +124,8 @@ public abstract class PipePluginAgent {
return temporaryProcessor;
}
- protected PipeConnector validateConnector(
- String pipeName, Map<String, String> connectorAttributes) throws
Exception {
+ protected PipeConnector validateSink(String pipeName, Map<String, String>
connectorAttributes)
+ throws Exception {
final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
final PipeConnector temporaryConnector = reflectSink(connectorParameters);
try {