This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5977 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 058274e8bbd7ced459db21a1cd989f2f79d76c9a Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jun 8 20:57:33 2023 +0800 [IOTDB-5979] Pipe: validation and customization failures during the first run of the PipeProcessor will affect the creation of subsequent pipes --- .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 1 - .../db/pipe/task/stage/PipeTaskCollectorStage.java | 49 ++++++++-------------- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 42 +++++++------------ 3 files changed, 32 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java index 715816cfbcc..7ad1bffa2a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java @@ -58,7 +58,6 @@ public class PipeTaskBuilder { pipeStaticMeta.getPipeName(), dataRegionId, collectorStage.getEventSupplier(), - collectorStage.getCollectorPendingQueue(), pipeStaticMeta.getProcessorParameters(), connectorStage.getPipeConnectorPendingQueue()); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java index 9e03458bfa0..ca1ec8fda9b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java @@ -31,28 +31,12 @@ import org.apache.iotdb.pipe.api.PipeCollector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; import java.util.HashMap; public class PipeTaskCollectorStage extends PipeTaskStage { - private final PipeParameters collectorParameters; - - /** - * TODO: have a better way to control busy/idle status of PipeTaskCollectorStage. - * - * <p>Currently, this field is for IoTDBDataRegionCollector only. IoTDBDataRegionCollector uses - * collectorPendingQueue as an internal data structure to store realtime events. - * - * <p>PendingQueue can detect whether the queue is empty or not, and it can notify the - * PipeTaskProcessorStage to stop processing data when the queue is empty to avoid unnecessary - * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the - * queue is not empty. - */ - private UnboundedBlockingPendingQueue<Event> collectorPendingQueue; - private final PipeCollector pipeCollector; public PipeTaskCollectorStage( @@ -60,6 +44,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage { PipeTaskMeta pipeTaskMeta, long creationTime, PipeParameters collectorParameters) { + PipeParameters localizedCollectorParameters; + // TODO: avoid if-else, use reflection to create collector all the time if (collectorParameters .getStringOrDefault( @@ -69,40 +55,43 @@ public class PipeTaskCollectorStage extends PipeTaskStage { // we want to pass data region id to collector, so we need to create a new collector // parameters and put data region id into it. we can't put data region id into collector // parameters directly, because the given collector parameters may be used by other pipe task. - this.collectorParameters = + localizedCollectorParameters = new PipeParameters(new HashMap<>(collectorParameters.getAttribute())); // set data region id to collector parameters, so that collector can get data region id inside // collector - this.collectorParameters + localizedCollectorParameters .getAttribute() .put(PipeCollectorConstant.DATA_REGION_KEY, String.valueOf(dataRegionId.getId())); - collectorPendingQueue = new UnboundedBlockingPendingQueue<>(); this.pipeCollector = - new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, collectorPendingQueue); + new IoTDBDataRegionCollector( + pipeTaskMeta, creationTime, new UnboundedBlockingPendingQueue<>()); } else { - this.collectorParameters = collectorParameters; + localizedCollectorParameters = collectorParameters; - this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters); + this.pipeCollector = PipeAgent.plugin().reflectCollector(localizedCollectorParameters); } - } - @Override - public void createSubtask() throws PipeException { + // validate and customize should be called before createSubtask. this allows collector exposing + // exceptions in advance. try { // 1. validate collector parameters - pipeCollector.validate(new PipeParameterValidator(collectorParameters)); + pipeCollector.validate(new PipeParameterValidator(localizedCollectorParameters)); // 2. customize collector final PipeCollectorRuntimeConfiguration runtimeConfiguration = new PipeCollectorRuntimeConfiguration(); - pipeCollector.customize(collectorParameters, runtimeConfiguration); - // TODO: use runtimeConfiguration to configure collector + pipeCollector.customize(localizedCollectorParameters, runtimeConfiguration); } catch (Exception e) { throw new PipeException(e.getMessage(), e); } } + @Override + public void createSubtask() throws PipeException { + // do nothing + } + @Override public void startSubtask() throws PipeException { try { @@ -129,8 +118,4 @@ public class PipeTaskCollectorStage extends PipeTaskStage { public EventSupplier getEventSupplier() { return pipeCollector::supply; } - - public UnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() { - return collectorPendingQueue; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index 571b04773a4..02c6576ce94 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.config.PipeProcessorConstant; import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor; import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager; import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor; -import org.apache.iotdb.db.pipe.task.connection.BlockingPendingQueue; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; @@ -38,8 +37,6 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; -import javax.annotation.Nullable; - public class PipeTaskProcessorStage extends PipeTaskStage { protected final PipeProcessorSubtaskExecutor executor = @@ -49,15 +46,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage { protected final PipeProcessor pipeProcessor; protected final PipeProcessorSubtask pipeProcessorSubtask; - protected final BlockingPendingQueue<Event> pipeCollectorInputPendingQueue; - protected final BlockingPendingQueue<Event> pipeConnectorOutputPendingQueue; - /** * @param pipeName pipe name * @param dataRegionId data region id * @param pipeCollectorInputEventSupplier used to input events from pipe collector - * @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from - * empty to not empty or from not empty to empty, null means no need to listen * @param pipeProcessorParameters used to create pipe processor * @param pipeConnectorOutputPendingQueue used to output events to pipe connector */ @@ -65,12 +57,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage { String pipeName, TConsensusGroupId dataRegionId, EventSupplier pipeCollectorInputEventSupplier, - @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue, PipeParameters pipeProcessorParameters, BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) { this.pipeProcessorParameters = pipeProcessorParameters; - final String taskId = pipeName + "_" + dataRegionId; pipeProcessor = pipeProcessorParameters .getStringOrDefault( @@ -79,22 +69,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage { .equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()) ? new PipeDoNothingProcessor() : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters); - final PipeEventCollector pipeConnectorOutputEventCollector = - new PipeEventCollector(pipeConnectorOutputPendingQueue); - - this.pipeProcessorSubtask = - new PipeProcessorSubtask( - taskId, - pipeCollectorInputEventSupplier, - pipeProcessor, - pipeConnectorOutputEventCollector); - - this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue; - this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue; - } - - @Override - public void createSubtask() throws PipeException { + // validate and customize should be called before createSubtask. this allows collector exposing + // exceptions in advance. try { // 1. validate processor parameters pipeProcessor.validate(new PipeParameterValidator(pipeProcessorParameters)); @@ -103,11 +79,23 @@ public class PipeTaskProcessorStage extends PipeTaskStage { final PipeProcessorRuntimeConfiguration runtimeConfiguration = new PipeProcessorRuntimeConfiguration(); pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration); - // TODO: use runtimeConfiguration to configure processor } catch (Exception e) { throw new PipeException(e.getMessage(), e); } + final String taskId = pipeName + "_" + dataRegionId; + final PipeEventCollector pipeConnectorOutputEventCollector = + new PipeEventCollector(pipeConnectorOutputPendingQueue); + this.pipeProcessorSubtask = + new PipeProcessorSubtask( + taskId, + pipeCollectorInputEventSupplier, + pipeProcessor, + pipeConnectorOutputEventCollector); + } + + @Override + public void createSubtask() throws PipeException { executor.register(pipeProcessorSubtask); }
