This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch 1.3-sub
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/1.3-sub by this push:
new 312e0477dc6 Optimized the hint for subscription (#17115)
312e0477dc6 is described below
commit 312e0477dc632a6ec5c026364a7cdbba7672875d
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jan 30 13:58:29 2026 +0800
Optimized the hint for subscription (#17115)
* ams
* y
* fix
---
.../agent/plugin/PipeConfigNodePluginAgent.java | 4 +-
.../pipe/sink/protocol/IoTDBConfigRegionSink.java | 4 +-
.../pipe/agent/plugin/PipeDataNodePluginAgent.java | 13 ++---
.../dataregion/PipeDataRegionPluginAgent.java | 4 +-
.../schemaregion/PipeSchemaRegionPluginAgent.java | 4 +-
.../task/builder/PipeDataNodeTaskBuilder.java | 2 +-
.../pipe/agent/task/stage/PipeTaskSinkStage.java | 12 ++--
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 6 +-
.../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +-
.../PipeDataNodeRemainingEventAndTimeOperator.java | 10 ++--
.../config/executor/ClusterConfigTaskExecutor.java | 13 ++---
.../task/stage/SubscriptionTaskSinkStage.java | 16 +++---
.../subtask/SubscriptionSinkSubtaskManager.java | 37 ++++++------
.../commons/pipe/agent/plugin/PipePluginAgent.java | 66 +++++++++++-----------
14 files changed, 94 insertions(+), 99 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index 8ddbb73398c..44e5675402c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -32,7 +32,7 @@ public class PipeConfigNodePluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSourceConstructor createPipeExtractorConstructor(
+ protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionSourceConstructor();
}
@@ -44,7 +44,7 @@ public class PipeConfigNodePluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSinkConstructor createPipeConnectorConstructor(
+ protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionSinkConstructor();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 1b508eb2f5c..5ee983d7945 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -183,9 +183,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink
{
true);
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Successfully transferred config event {}.",
pipeConfigRegionWritePlanEvent);
- }
+ LOGGER.info("Successfully transferred config event {}.",
pipeConfigRegionWritePlanEvent);
}
private void doTransferWrapper(final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 31f404de9d2..437b197b081 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -212,9 +212,9 @@ public class PipeDataNodePluginAgent {
pipeName, extractorAttributes, processorAttributes,
connectorAttributes);
}
- public void validateExtractor(Map<String, String> extractorAttributes)
throws Exception {
- dataRegionAgent.validateExtractor(extractorAttributes);
- schemaRegionAgent.validateExtractor(extractorAttributes);
+ public void validateSource(Map<String, String> sourceAttributes) throws
Exception {
+ dataRegionAgent.validateSource(sourceAttributes);
+ schemaRegionAgent.validateSource(sourceAttributes);
}
public void validateProcessor(Map<String, String> processorAttributes)
throws Exception {
@@ -222,9 +222,8 @@ public class PipeDataNodePluginAgent {
schemaRegionAgent.validateProcessor(processorAttributes);
}
- public void validateConnector(String pipeName, Map<String, String>
connectorAttributes)
- throws Exception {
- dataRegionAgent.validateConnector(pipeName, connectorAttributes);
- schemaRegionAgent.validateConnector(pipeName, connectorAttributes);
+ public void validateSource(String pipeName, Map<String, String>
sinkAttributes) throws Exception {
+ dataRegionAgent.validateSink(pipeName, sinkAttributes);
+ schemaRegionAgent.validateSink(pipeName, sinkAttributes);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index a8583559394..9f41abe28cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -33,7 +33,7 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSourceConstructor createPipeExtractorConstructor(
+ protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeDataRegionSourceConstructor((DataNodePipePluginMetaKeeper)
pipePluginMetaKeeper);
}
@@ -46,7 +46,7 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSinkConstructor createPipeConnectorConstructor(
+ protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeDataRegionSinkConstructor((DataNodePipePluginMetaKeeper)
pipePluginMetaKeeper);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
index 549030073b1..cd4293c6ac0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
@@ -32,7 +32,7 @@ public class PipeSchemaRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSourceConstructor createPipeExtractorConstructor(
+ protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionSourceConstructor();
}
@@ -44,7 +44,7 @@ public class PipeSchemaRegionPluginAgent extends
PipePluginAgent {
}
@Override
- protected PipeSinkConstructor createPipeConnectorConstructor(
+ protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionSinkConstructor();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 2edc5b943f6..e767731c1ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -124,7 +124,7 @@ public class PipeDataNodeTaskBuilder {
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
regionId,
sourceStage.getEventSupplier(),
- sinkStage.getPipeConnectorPendingQueue(),
+ sinkStage.getPipeSinkPendingQueue(),
PROCESSOR_EXECUTOR,
pipeTaskMeta,
pipeStaticMeta
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index c24db53e610..a22fbb536d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -34,7 +34,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
protected final String pipeName;
protected final long creationTime;
- protected final PipeParameters pipeConnectorParameters;
+ protected final PipeParameters pipeSinkParameters;
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
@@ -43,12 +43,12 @@ public class PipeTaskSinkStage extends PipeTaskStage {
public PipeTaskSinkStage(
String pipeName,
long creationTime,
- PipeParameters pipeConnectorParameters,
+ PipeParameters pipeSinkParameters,
int regionId,
Supplier<? extends PipeSinkSubtaskExecutor> executor) {
this.pipeName = pipeName;
this.creationTime = creationTime;
- this.pipeConnectorParameters = pipeConnectorParameters;
+ this.pipeSinkParameters = pipeSinkParameters;
this.regionId = regionId;
this.executor = executor;
@@ -60,7 +60,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
PipeSinkSubtaskManager.instance()
.register(
executor,
- pipeConnectorParameters,
+ pipeSinkParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime,
regionId));
}
@@ -85,7 +85,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
- public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
- return
PipeSinkSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
+ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
+ return
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 0df3a773b9c..35f7983075d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -39,9 +39,9 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
protected int registeredTaskCount;
public PipeSinkSubtaskLifeCycle(
- PipeSinkSubtaskExecutor executor,
- PipeSinkSubtask subtask,
- UnboundedBlockingPendingQueue<Event> pendingQueue) {
+ final PipeSinkSubtaskExecutor executor,
+ final PipeSinkSubtask subtask,
+ final UnboundedBlockingPendingQueue<Event> pendingQueue) {
this.executor = executor;
this.subtask = subtask;
this.pendingQueue = pendingQueue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 97859938828..905e3b535b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -240,7 +240,7 @@ public class PipeSinkSubtaskManager {
}
}
- public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index f54e8bcfccb..38cfb579622 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
// Calculate from schema region extractors directly for it requires less
computation
- private final Set<IoTDBSchemaRegionSource> schemaRegionExtractors =
+ private final Set<IoTDBSchemaRegionSource> schemaRegionSources =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
@@ -105,7 +105,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
tsfileEventCount.get()
+ rawTabletEventCount.get()
+ insertNodeEventCount.get()
- + schemaRegionExtractors.stream()
+ + schemaRegionSources.stream()
.map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
@@ -157,7 +157,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
}
final long totalSchemaRegionWriteEventCount =
- schemaRegionExtractors.stream()
+ schemaRegionSources.stream()
.map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
@@ -192,8 +192,8 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
//////////////////////////// Register & deregister (pipe integration)
////////////////////////////
- void register(final IoTDBSchemaRegionSource extractor) {
- schemaRegionExtractors.add(extractor);
+ void register(final IoTDBSchemaRegionSource source) {
+ schemaRegionSources.add(source);
}
//////////////////////////// Rate ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b711b652829..d4ba79d2c5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -323,7 +323,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create();
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException(
new IoTDBException(
- "Subscription not enabled, please set config
`subscription_enabled` to true.",
+ "Subscription is not enabled.",
TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode()));
}
@@ -1977,7 +1977,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try {
if (!alterPipeStatement.getExtractorAttributes().isEmpty()) {
if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
-
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
+
PipeDataNodeAgent.plugin().validateSource(alterPipeStatement.getExtractorAttributes());
} else {
pipeMetaFromCoordinator
.getStaticMeta()
@@ -1985,7 +1985,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeStatement.getExtractorAttributes()));
PipeDataNodeAgent.plugin()
- .validateExtractor(
+ .validateSource(
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute());
}
}
@@ -2008,7 +2008,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (!alterPipeStatement.getConnectorAttributes().isEmpty()) {
if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
PipeDataNodeAgent.plugin()
- .validateConnector(pipeName,
alterPipeStatement.getConnectorAttributes());
+ .validateSource(pipeName,
alterPipeStatement.getConnectorAttributes());
} else {
pipeMetaFromCoordinator
.getStaticMeta()
@@ -2016,7 +2016,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeStatement.getConnectorAttributes()));
PipeDataNodeAgent.plugin()
- .validateConnector(
+ .validateSource(
pipeName,
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute());
}
@@ -2274,8 +2274,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final TopicMeta temporaryTopicMeta =
new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
try {
- PipeDataNodeAgent.plugin()
- .validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
+
PipeDataNodeAgent.plugin().validateSource(temporaryTopicMeta.generateExtractorAttributes());
PipeDataNodeAgent.plugin()
.validateProcessor(temporaryTopicMeta.generateProcessorAttributes());
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 233164b4d1d..73fca57a1fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -31,12 +31,12 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
public class SubscriptionTaskSinkStage extends PipeTaskSinkStage {
public SubscriptionTaskSinkStage(
- String pipeName,
- long creationTime,
- PipeParameters pipeConnectorParameters,
- int regionId,
- PipeSinkSubtaskExecutor executor) {
- super(pipeName, creationTime, pipeConnectorParameters, regionId, () ->
executor);
+ final String pipeName,
+ final long creationTime,
+ final PipeParameters pipeSinkParameters,
+ final int regionId,
+ final PipeSinkSubtaskExecutor executor) {
+ super(pipeName, creationTime, pipeSinkParameters, regionId, () ->
executor);
}
@Override
@@ -45,7 +45,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
SubscriptionSinkSubtaskManager.instance()
.register(
executor.get(),
- pipeConnectorParameters,
+ pipeSinkParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime,
regionId));
}
@@ -70,7 +70,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
- public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
return SubscriptionSinkSubtaskManager.instance()
.getPipeConnectorPendingQueue(connectorSubtaskId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 959870898a2..07def3ff4d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -64,10 +64,10 @@ public class SubscriptionSinkSubtaskManager {
public synchronized String register(
final PipeSinkSubtaskExecutor executor,
- final PipeParameters pipeConnectorParameters,
+ final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorKey =
- pipeConnectorParameters
+ pipeSinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -86,13 +86,13 @@ public class SubscriptionSinkSubtaskManager {
environment.getRegionId(),
connectorKey);
- boolean realTimeFirst =
- pipeConnectorParameters.getBooleanOrDefault(
+ final boolean realTimeFirst =
+ pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
- String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
+ String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
attributeSortedString = "__subscription_" + attributeSortedString;
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
@@ -101,36 +101,33 @@ public class SubscriptionSinkSubtaskManager {
? new PipeRealtimePriorityBlockingQueue()
: new UnboundedBlockingPendingQueue<>(new
PipeDataRegionEventCounter());
- final PipeConnector pipeConnector =
-
PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
+ final PipeConnector pipeSink =
+
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters);
// 1. Construct, validate and customize PipeConnector, and then
handshake (create connection)
// with the target
try {
- pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
- pipeConnector.customize(
- pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
- pipeConnector.handshake();
+ pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
+ pipeSink.customize(pipeSinkParameters, new
PipeTaskRuntimeConfiguration(environment));
+ pipeSink.handshake();
} catch (final Exception e) {
try {
- pipeConnector.close();
+ pipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
- "Failed to close connector after failed to initialize connector.
"
- + "Ignore this exception.",
+ "Failed to close sink after failed to initialize sink. " +
"Ignore this exception.",
closeException);
}
- throw new PipeException(
- "Failed to construct PipeConnector, because of " + e.getMessage(),
e);
+ throw new PipeException("Failed to construct PipeSink, because of " +
e.getMessage(), e);
}
// 2. Fetch topic and consumer group id from connector parameters
- final String topicName =
pipeConnectorParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
+ final String topicName =
pipeSinkParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
final String consumerGroupId =
-
pipeConnectorParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
+
pipeSinkParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
if (Objects.isNull(topicName) || Objects.isNull(consumerGroupId)) {
throw new SubscriptionException(
String.format(
- "Failed to construct subscription connector, because of %s or
%s does not exist in pipe connector parameters",
+ "Failed to construct subscription sink, because of %s or %s
does not exist in pipe connector parameters",
PipeSinkConstant.SINK_TOPIC_KEY,
PipeSinkConstant.SINK_CONSUMER_GROUP_KEY));
}
@@ -142,7 +139,7 @@ public class SubscriptionSinkSubtaskManager {
attributeSortedString,
0,
pendingQueue,
- pipeConnector,
+ pipeSink,
topicName,
consumerGroupId);
final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index aca42727082..caffb9ef859 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -53,57 +53,58 @@ public abstract class PipePluginAgent {
private final PipeProcessorConstructor pipeProcessorConstructor;
private final PipeSinkConstructor pipeSinkConstructor;
- protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+ protected PipePluginAgent(final PipePluginMetaKeeper pipePluginMetaKeeper) {
this.pipePluginMetaKeeper = pipePluginMetaKeeper;
- pipeSourceConstructor =
createPipeExtractorConstructor(pipePluginMetaKeeper);
+ pipeSourceConstructor = createPipeSourceConstructor(pipePluginMetaKeeper);
pipeProcessorConstructor =
createPipeProcessorConstructor(pipePluginMetaKeeper);
- pipeSinkConstructor = createPipeConnectorConstructor(pipePluginMetaKeeper);
+ pipeSinkConstructor = createPipeSinkConstructor(pipePluginMetaKeeper);
}
- protected abstract PipeSourceConstructor createPipeExtractorConstructor(
- PipePluginMetaKeeper pipePluginMetaKeeper);
+ protected abstract PipeSourceConstructor createPipeSourceConstructor(
+ final PipePluginMetaKeeper pipePluginMetaKeeper);
protected abstract PipeProcessorConstructor createPipeProcessorConstructor(
- PipePluginMetaKeeper pipePluginMetaKeeper);
+ final PipePluginMetaKeeper pipePluginMetaKeeper);
- protected abstract PipeSinkConstructor createPipeConnectorConstructor(
- PipePluginMetaKeeper pipePluginMetaKeeper);
+ protected abstract PipeSinkConstructor createPipeSinkConstructor(
+ final PipePluginMetaKeeper pipePluginMetaKeeper);
- public final PipeExtractor reflectExtractor(PipeParameters
extractorParameters) {
- return pipeSourceConstructor.reflectPlugin(extractorParameters);
+ public final PipeExtractor reflectSource(final PipeParameters
sourceParameters) {
+ return pipeSourceConstructor.reflectPlugin(sourceParameters);
}
- public final PipeProcessor reflectProcessor(PipeParameters
processorParameters) {
+ public final PipeProcessor reflectProcessor(final PipeParameters
processorParameters) {
return pipeProcessorConstructor.reflectPlugin(processorParameters);
}
- public final PipeConnector reflectConnector(PipeParameters
connectorParameters) {
- return pipeSinkConstructor.reflectPlugin(connectorParameters);
+ public final PipeConnector reflectSink(final PipeParameters sinkParameters) {
+ return pipeSinkConstructor.reflectPlugin(sinkParameters);
}
public void validate(
- String pipeName,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes)
+ final String pipeName,
+ final Map<String, String> sourceAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> sinkAttributes)
throws Exception {
- validateExtractor(extractorAttributes);
+ validateSource(sourceAttributes);
validateProcessor(processorAttributes);
- validateConnector(pipeName, connectorAttributes);
+ validateSink(pipeName, sinkAttributes);
}
- public void validateExtractor(Map<String, String> extractorAttributes)
throws Exception {
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
- final PipeExtractor temporaryExtractor =
reflectExtractor(extractorParameters);
+ public PipeExtractor validateSource(final Map<String, String>
sourceAttributes) throws Exception {
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
+ final PipeExtractor temporaryExtractor = reflectSource(sourceParameters);
try {
- temporaryExtractor.validate(new
PipeParameterValidator(extractorParameters));
+ temporaryExtractor.validate(new
PipeParameterValidator(sourceParameters));
} finally {
try {
temporaryExtractor.close();
} catch (Exception e) {
- LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(),
e);
+ LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e);
}
}
+ return temporaryExtractor;
}
public void validateProcessor(Map<String, String> processorAttributes)
throws Exception {
@@ -120,23 +121,24 @@ public abstract class PipePluginAgent {
}
}
- public void validateConnector(String pipeName, Map<String, String>
connectorAttributes)
+ public PipeConnector validateSink(String pipeName, Map<String, String>
sinkAttributes)
throws Exception {
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
- final PipeConnector temporaryConnector =
reflectConnector(connectorParameters);
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+ final PipeConnector temporarySink = reflectSink(sinkParameters);
try {
- temporaryConnector.validate(new
PipeParameterValidator(connectorParameters));
- temporaryConnector.customize(
- connectorParameters,
+ temporarySink.validate(new PipeParameterValidator(sinkParameters));
+ temporarySink.customize(
+ sinkParameters,
new PipeTaskRuntimeConfiguration(new
PipeTaskTemporaryRuntimeEnvironment(pipeName)));
- temporaryConnector.handshake();
+ temporarySink.handshake();
} finally {
try {
- temporaryConnector.close();
+ temporarySink.close();
} catch (Exception e) {
LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(),
e);
}
}
+ return temporarySink;
}
/**